Skip to main content

saluki_components/destinations/dsd_debug_log/
mod.rs

1use std::{
2    io::Write,
3    path::{Path, PathBuf},
4};
5
6use async_trait::async_trait;
7use bytesize::ByteSize;
8use chrono::{DateTime, Utc};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::collections::FastHashMap;
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::TagSet;
13use saluki_core::{
14    components::{
15        destinations::{Destination, DestinationBuilder, DestinationContext},
16        ComponentContext,
17    },
18    data_model::event::{metric::Metric, Event, EventType},
19};
20use saluki_error::{generic_error, GenericError};
21use serde::Deserialize;
22use stringtheory::MetaString;
23use tokio::select;
24use tracing::{debug, warn};
25use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
26use tracing_rolling_file::{RollingConditionBase, RollingFileAppenderBase};
27
28const DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE: ByteSize = ByteSize::mb(10);
29const DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS: usize = 3;
30const DEBUG_LOG_WRITER_BUFFER_LINES: usize = 4096;
31const DOGSTATSD_METRICS_STATS_ENABLE_KEY: &str = "dogstatsd_metrics_stats_enable";
32
33const fn default_true() -> bool {
34    true
35}
36
37const fn default_log_file_max_size() -> ByteSize {
38    DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE
39}
40
41const fn default_log_file_max_rolls() -> usize {
42    DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS
43}
44
45/// Configuration for the DogStatsD debug log destination.
46#[derive(Clone, Debug, Deserialize)]
47#[cfg_attr(test, derive(serde::Serialize))]
48pub struct DogStatsDDebugLogConfiguration {
49    /// Whether DogStatsD metric-level statistics are enabled.
50    ///
51    /// The debug log destination is always present when `dogstatsd_logging_enabled` is `true`,
52    /// but it drops metrics until this runtime flag becomes `true`, and only during that period.
53    ///
54    /// Defaults to `false`.
55    #[serde(rename = "dogstatsd_metrics_stats_enable", default)]
56    metrics_stats_enabled: bool,
57
58    #[serde(skip)]
59    configuration: Option<GenericConfiguration>,
60
61    /// Whether DogStatsD metric-level statistics should also be written to a log file.
62    ///
63    /// This controls whether the destination is added to the topology.
64    /// Defaults to `true`.
65    #[serde(rename = "dogstatsd_logging_enabled", default = "default_true")]
66    logging_enabled: bool,
67
68    /// Path to the DogStatsD debug log file.
69    ///
70    /// This defaults to the platform-specific core Agent DogStatsD stats log path when the configured value is empty.
71    #[serde(rename = "dogstatsd_log_file", default)]
72    log_file: PathBuf,
73
74    /// Maximum size of the active debug log file before rotation.
75    ///
76    /// Defaults to `10Mb`.
77    #[serde(rename = "dogstatsd_log_file_max_size", default = "default_log_file_max_size")]
78    log_file_max_size: ByteSize,
79
80    /// Number of rotated debug log files to keep.
81    ///
82    /// Defaults to `3`.
83    #[serde(rename = "dogstatsd_log_file_max_rolls", default = "default_log_file_max_rolls")]
84    log_file_max_rolls: usize,
85}
86
87#[cfg(test)]
88impl PartialEq for DogStatsDDebugLogConfiguration {
89    fn eq(&self, other: &Self) -> bool {
90        self.metrics_stats_enabled == other.metrics_stats_enabled
91            && self.logging_enabled == other.logging_enabled
92            && self.log_file == other.log_file
93            && self.log_file_max_size == other.log_file_max_size
94            && self.log_file_max_rolls == other.log_file_max_rolls
95    }
96}
97
98/// DogStatsD destination that writes metric debug lines to a rotating file.
99struct DogStatsDDebugLog {
100    log_file: PathBuf,
101    log_file_max_size: ByteSize,
102    log_file_max_rolls: usize,
103    writer: Option<DebugLogWriter>,
104    metrics_stats_enabled: bool,
105    configuration: GenericConfiguration,
106    stats: FastHashMap<ContextNoOrigin, MetricSample>,
107}
108
109struct DebugLogWriter {
110    writer: NonBlocking,
111    _guard: WorkerGuard,
112}
113
114#[derive(Debug, Default)]
115struct MetricSample {
116    count: u64,
117    last_seen: u64,
118}
119
120#[derive(Eq, Hash, PartialEq)]
121struct ContextNoOrigin {
122    name: MetaString,
123    tags: TagSet,
124}
125
126impl DogStatsDDebugLogConfiguration {
127    /// Creates a new `DogStatsDDebugLogConfiguration` from the given configuration.
128    ///
129    /// If `dogstatsd_log_file` is empty, `default_log_file_path` is used.
130    pub fn from_configuration(
131        config: &GenericConfiguration, default_log_file_path: PathBuf,
132    ) -> Result<Self, GenericError> {
133        let mut cfg: Self = config.as_typed()?;
134
135        if cfg.log_file.as_os_str().is_empty() {
136            cfg.log_file = default_log_file_path;
137        }
138
139        if cfg.log_file.to_str().is_none() {
140            return Err(generic_error!(
141                "dogstatsd_log_file must be valid UTF-8, got '{}'",
142                cfg.log_file.display()
143            ));
144        }
145
146        cfg.configuration = Some(config.clone());
147
148        Ok(cfg)
149    }
150
151    /// Returns `true` if the debug log destination should be added to the topology.
152    pub const fn enabled(&self) -> bool {
153        self.logging_enabled
154    }
155
156    /// Returns the DogStatsD debug log file path.
157    pub fn log_file(&self) -> &Path {
158        &self.log_file
159    }
160
161    /// Returns the maximum size of the active debug log file before rotation.
162    pub const fn log_file_max_size(&self) -> ByteSize {
163        self.log_file_max_size
164    }
165
166    /// Returns the number of rotated debug log files to keep.
167    pub const fn log_file_max_rolls(&self) -> usize {
168        self.log_file_max_rolls
169    }
170}
171
172impl DogStatsDDebugLog {
173    fn from_configuration(config: &DogStatsDDebugLogConfiguration) -> Result<Self, GenericError> {
174        let mut destination = Self {
175            log_file: config.log_file.clone(),
176            log_file_max_size: config.log_file_max_size,
177            log_file_max_rolls: config.log_file_max_rolls,
178            writer: None,
179            metrics_stats_enabled: config.metrics_stats_enabled,
180            configuration: config
181                .configuration
182                .clone()
183                .expect("configuration must be set via from_configuration"),
184            stats: FastHashMap::default(),
185        };
186
187        if destination.metrics_stats_enabled {
188            destination.ensure_writer()?;
189        }
190
191        Ok(destination)
192    }
193
194    fn process_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
195        if !self.metrics_stats_enabled {
196            return Ok(());
197        }
198
199        self.write_metric(metric)
200    }
201
202    fn write_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
203        self.ensure_writer()?;
204
205        let context = metric.context();
206        let metric_context = ContextNoOrigin {
207            name: context.name().clone(),
208            tags: context.tags().clone(),
209        };
210
211        let timestamp = saluki_common::time::get_coarse_unix_timestamp();
212        let sample = self.stats.entry(metric_context).or_default();
213        sample.count += 1;
214        sample.last_seen = timestamp;
215
216        let writer = self.writer.as_mut().expect("writer should be initialized");
217        writeln!(
218            writer.writer,
219            "Metric Name: {} | Tags: {{{}}} | Count: {} | Last Seen: {}",
220            context.name(),
221            format_tags(context.tags()),
222            sample.count,
223            format_timestamp(sample.last_seen)
224        )
225        .map_err(|e| {
226            generic_error!(
227                "Failed to write to DogStatsD debug log file '{}': {}",
228                self.log_file.display(),
229                e
230            )
231        })
232    }
233
234    fn ensure_writer(&mut self) -> Result<(), GenericError> {
235        if self.writer.is_some() {
236            return Ok(());
237        }
238
239        let appender = RollingFileAppenderBase::new(
240            &self.log_file,
241            RollingConditionBase::new().max_size(self.log_file_max_size.as_u64()),
242            self.log_file_max_rolls,
243        )
244        .map_err(|e| generic_error!("Failed to open dogstatsd_log_file '{}': {}", self.log_file.display(), e))?;
245
246        let (writer, guard) = NonBlockingBuilder::default()
247            .thread_name("dsd-dbg-writer")
248            .buffered_lines_limit(DEBUG_LOG_WRITER_BUFFER_LINES)
249            // Drop debug log lines rather than slow DogStatsD metric ingestion.
250            .lossy(true)
251            .finish(appender);
252
253        self.writer = Some(DebugLogWriter { writer, _guard: guard });
254
255        Ok(())
256    }
257}
258
259#[async_trait]
260impl Destination for DogStatsDDebugLog {
261    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
262        let mut health = context.take_health_handle();
263        health.mark_ready();
264
265        let mut metrics_stats_enabled_watcher =
266            self.configuration.watch_for_updates(DOGSTATSD_METRICS_STATS_ENABLE_KEY);
267
268        loop {
269            select! {
270                _ = health.live() => continue,
271                maybe_events = context.events().next() => match maybe_events {
272                    Some(events) => {
273                        for event in events {
274                            if let Event::Metric(metric) = event {
275                                if let Err(error) = self.process_metric(&metric) {
276                                    warn!(error = %error, "Failed to write DogStatsD debug log line; continuing.");
277                                }
278                            }
279                        }
280                    },
281                    None => break,
282                },
283                (_, maybe_metrics_stats_enabled) = metrics_stats_enabled_watcher.changed::<bool>() => {
284                    if let Some(metrics_stats_enabled) = maybe_metrics_stats_enabled {
285                        self.metrics_stats_enabled = metrics_stats_enabled;
286                        debug!(metrics_stats_enabled, "Updated DogStatsD metrics stats debug logging gate.");
287                    }
288                },
289            }
290        }
291
292        Ok(())
293    }
294}
295
296#[async_trait]
297impl DestinationBuilder for DogStatsDDebugLogConfiguration {
298    fn input_event_type(&self) -> EventType {
299        EventType::Metric
300    }
301
302    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
303        DogStatsDDebugLog::from_configuration(self)
304            .map(|destination| Box::new(destination) as Box<dyn Destination + Send>)
305    }
306}
307
308impl MemoryBounds for DogStatsDDebugLogConfiguration {
309    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
310        builder
311            .minimum()
312            .with_single_value::<DogStatsDDebugLog>("component struct");
313    }
314}
315
316fn format_tags(tags: &TagSet) -> String {
317    let mut formatted = String::new();
318
319    for tag in tags {
320        if !formatted.is_empty() {
321            formatted.push(' ');
322        }
323        formatted.push_str(tag.as_str());
324    }
325
326    formatted
327}
328
329fn format_timestamp(timestamp: u64) -> String {
330    i64::try_from(timestamp)
331        .ok()
332        .and_then(|ts| DateTime::<Utc>::from_timestamp(ts, 0))
333        .map(|dt| dt.format("%Y-%m-%d %H:%M:%S +0000 UTC").to_string())
334        .unwrap_or_else(|| timestamp.to_string())
335}
336
337#[cfg(test)]
338mod tests {
339    use std::{
340        fs,
341        path::{Path, PathBuf},
342    };
343
344    use bytesize::ByteSize;
345    use datadog_agent_config_testing::config_registry::structs;
346    use datadog_agent_config_testing::run_config_smoke_tests;
347    use saluki_context::Context;
348    use saluki_core::data_model::event::metric::Metric;
349    use serde_json::json;
350    use tempfile::tempdir;
351
352    use super::{DogStatsDDebugLog, DogStatsDDebugLogConfiguration};
353    use crate::config::{DatadogRemapper, KEY_ALIASES};
354
355    fn test_default_log_file_path() -> PathBuf {
356        PathBuf::from("/tmp/default-dogstatsd-stats.log")
357    }
358
359    async fn deser_config(raw_json: &str) -> DogStatsDDebugLogConfiguration {
360        let value = serde_json::from_str(raw_json).expect("test config should be valid JSON");
361        let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
362
363        DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
364            .expect("DogStatsDDebugLogConfiguration should deserialize")
365    }
366
367    #[tokio::test]
368    async fn defaults_match_core_agent() {
369        let config = deser_config("{}").await;
370
371        assert!(config.enabled());
372        assert!(!config.metrics_stats_enabled);
373        assert!(config.logging_enabled);
374        assert_eq!(config.log_file(), test_default_log_file_path());
375        assert_eq!(config.log_file_max_size(), ByteSize::mb(10));
376        assert_eq!(config.log_file_max_rolls(), 3);
377    }
378
379    #[tokio::test]
380    async fn logging_enabled_controls_topology_wiring() {
381        let config = deser_config(r#"{ "dogstatsd_metrics_stats_enable": true }"#).await;
382        assert!(config.enabled());
383        assert!(config.metrics_stats_enabled);
384
385        let config = deser_config(
386            r#"{
387                "dogstatsd_metrics_stats_enable": true,
388                "dogstatsd_logging_enabled": true
389            }"#,
390        )
391        .await;
392        assert!(config.enabled());
393        assert!(config.metrics_stats_enabled);
394
395        let config = deser_config(
396            r#"{
397                "dogstatsd_metrics_stats_enable": true,
398                "dogstatsd_logging_enabled": false
399            }"#,
400        )
401        .await;
402        assert!(!config.enabled());
403        assert!(config.metrics_stats_enabled);
404
405        let config = deser_config(
406            r#"{
407                "dogstatsd_metrics_stats_enable": false,
408                "dogstatsd_logging_enabled": true
409            }"#,
410        )
411        .await;
412        assert!(config.enabled());
413        assert!(!config.metrics_stats_enabled);
414    }
415
416    #[tokio::test]
417    async fn explicit_log_file_path_is_preserved() {
418        let config = deser_config(r#"{ "dogstatsd_log_file": "/tmp/dsd-debug.log" }"#).await;
419
420        assert_eq!(config.log_file(), std::path::Path::new("/tmp/dsd-debug.log"));
421    }
422
423    #[tokio::test]
424    async fn negative_log_file_max_rolls_is_rejected() {
425        let value = json!({ "dogstatsd_log_file_max_rolls": -1 });
426        let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
427
428        let result = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path());
429        assert!(result.is_err());
430    }
431
432    #[tokio::test]
433    async fn smoke_test() {
434        run_config_smoke_tests(
435            structs::DOGSTATSD_DEBUG_LOG_CONFIGURATION,
436            &[],
437            json!({}),
438            |cfg| {
439                DogStatsDDebugLogConfiguration::from_configuration(&cfg, test_default_log_file_path())
440                    .expect("DogStatsDDebugLogConfiguration should deserialize")
441            },
442            KEY_ALIASES,
443            DatadogRemapper::new,
444        )
445        .await
446    }
447
448    async fn test_config(log_file: PathBuf, max_size: ByteSize, max_rolls: usize) -> DogStatsDDebugLogConfiguration {
449        let (config, _) = saluki_config::ConfigurationLoader::for_tests(
450            Some(json!({
451                "dogstatsd_metrics_stats_enable": true,
452                "dogstatsd_logging_enabled": true,
453                "dogstatsd_log_file": log_file.display().to_string(),
454                "dogstatsd_log_file_max_size": max_size.as_u64(),
455                "dogstatsd_log_file_max_rolls": max_rolls,
456            })),
457            None,
458            false,
459        )
460        .await;
461
462        DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
463            .expect("DogStatsDDebugLogConfiguration should deserialize")
464    }
465
466    fn read_log_files(log_file: &Path, max_rolls: usize) -> String {
467        let mut output = String::new();
468
469        for roll in (0..=max_rolls).rev() {
470            let path = rolled_path(log_file, roll);
471            if path.exists() {
472                output.push_str(&fs::read_to_string(&path).expect("debug log file should be readable"));
473            }
474        }
475
476        output
477    }
478
479    fn rolled_path(log_file: &Path, roll: usize) -> PathBuf {
480        if roll == 0 {
481            log_file.to_path_buf()
482        } else {
483            PathBuf::from(format!("{}.{}", log_file.display(), roll))
484        }
485    }
486
487    fn tagged_metric() -> Metric {
488        let context = Context::from_static_parts("custom.metric", &["env:prod", "service:web"]);
489        Metric::counter(context, 1.0)
490    }
491
492    #[tokio::test]
493    async fn writes_metric_debug_lines_and_updates_count() {
494        let tempdir = tempdir().expect("temporary directory should be created");
495        let log_file = tempdir.path().join("dogstatsd-stats.log");
496        let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
497        let metric = tagged_metric();
498
499        let mut destination =
500            DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
501        destination
502            .write_metric(&metric)
503            .expect("first metric should be written");
504        destination
505            .write_metric(&metric)
506            .expect("second metric should be written");
507        drop(destination);
508
509        let output = read_log_files(&log_file, config.log_file_max_rolls());
510        let lines = output.lines().collect::<Vec<_>>();
511
512        assert_eq!(lines.len(), 2);
513        assert!(lines[0].contains("Metric Name: custom.metric"));
514        assert!(lines[0].contains("Tags: {env:prod service:web}"));
515        assert!(lines[0].contains("Count: 1"));
516        assert!(lines[0].contains("Last Seen: "));
517        assert!(lines[1].contains("Count: 2"));
518    }
519
520    #[tokio::test]
521    async fn dynamically_drops_until_metrics_stats_are_enabled() {
522        use std::time::Duration;
523
524        use saluki_config::dynamic::ConfigUpdate;
525
526        let tempdir = tempdir().expect("temporary directory should be created");
527        let log_file = tempdir.path().join("dogstatsd-stats.log");
528        let (config, sender) = saluki_config::ConfigurationLoader::for_tests(
529            Some(json!({
530                "dogstatsd_log_file": log_file.display().to_string(),
531                "dogstatsd_log_file_max_size": "64kb",
532                "dogstatsd_log_file_max_rolls": 3,
533                "dogstatsd_logging_enabled": true
534            })),
535            None,
536            true,
537        )
538        .await;
539        let sender = sender.expect("dynamic sender should be present");
540        sender
541            .send(ConfigUpdate::Snapshot(json!({})))
542            .await
543            .expect("initial dynamic snapshot should be sent");
544        config.ready().await;
545
546        let dsd_config = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
547            .expect("DogStatsDDebugLogConfiguration should deserialize");
548        assert!(dsd_config.enabled());
549        assert!(!dsd_config.metrics_stats_enabled);
550
551        let metric = tagged_metric();
552        let mut destination =
553            DogStatsDDebugLog::from_configuration(&dsd_config).expect("debug log destination should be built");
554        let mut watcher = destination
555            .configuration
556            .watch_for_updates(super::DOGSTATSD_METRICS_STATS_ENABLE_KEY);
557        destination
558            .process_metric(&metric)
559            .expect("disabled metric should be dropped cleanly");
560        assert!(!log_file.exists());
561
562        sender
563            .send(ConfigUpdate::Partial {
564                key: "dogstatsd_metrics_stats_enable".to_string(),
565                value: json!(true),
566            })
567            .await
568            .expect("dynamic update should be sent");
569
570        let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
571            .await
572            .expect("metrics stats flag should receive enabled update");
573        destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
574
575        destination
576            .process_metric(&metric)
577            .expect("enabled metric should be written");
578
579        sender
580            .send(ConfigUpdate::Partial {
581                key: "dogstatsd_metrics_stats_enable".to_string(),
582                value: json!(false),
583            })
584            .await
585            .expect("dynamic update should be sent");
586
587        let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
588            .await
589            .expect("metrics stats flag should receive disabled update");
590        destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
591
592        destination
593            .process_metric(&metric)
594            .expect("disabled metric should be dropped cleanly");
595        drop(destination);
596
597        let output = read_log_files(&log_file, dsd_config.log_file_max_rolls());
598        let lines = output.lines().collect::<Vec<_>>();
599
600        assert_eq!(lines.len(), 1);
601        assert!(lines[0].contains("Metric Name: custom.metric"));
602        assert!(lines[0].contains("Count: 1"));
603    }
604
605    #[tokio::test]
606    async fn rotates_log_file_at_configured_size() {
607        let tempdir = tempdir().expect("temporary directory should be created");
608        let log_file = tempdir.path().join("dogstatsd-stats.log");
609        let min_debug_line_len =
610            "Metric Name: custom.metric | Tags: {env:prod service:web} | Count: 1 | Last Seen: ".len();
611        let config = test_config(log_file.clone(), ByteSize::b(min_debug_line_len as u64), 2).await;
612        let metric = tagged_metric();
613
614        let mut destination =
615            DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
616        for _ in 0..12 {
617            destination.write_metric(&metric).expect("metric should be written");
618        }
619        drop(destination);
620
621        assert!(log_file.exists());
622        assert!(rolled_path(&log_file, 1).exists());
623        assert!(rolled_path(&log_file, 2).exists());
624        assert!(!rolled_path(&log_file, 3).exists());
625
626        let output = read_log_files(&log_file, config.log_file_max_rolls());
627        assert!(output.contains("Metric Name: custom.metric"));
628    }
629
630    #[tokio::test]
631    async fn build_error_mentions_log_file_config_key_and_path() {
632        let tempdir = tempdir().expect("temporary directory should be created");
633        let blocked_parent = tempdir.path().join("not-a-directory");
634        fs::write(&blocked_parent, "not a directory").expect("blocking file should be written");
635        let log_file = blocked_parent.join("dogstatsd-stats.log");
636        let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
637
638        let err = match DogStatsDDebugLog::from_configuration(&config) {
639            Ok(_) => panic!("build should fail"),
640            Err(err) => err,
641        };
642        let err = err.to_string();
643
644        assert!(err.contains("dogstatsd_log_file"));
645        assert!(err.contains(&log_file.display().to_string()));
646    }
647}