1use std::{
2 io::Write,
3 path::{Path, PathBuf},
4};
5
6use async_trait::async_trait;
7use bytesize::ByteSize;
8use chrono::{DateTime, Utc};
9use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::collections::FastHashMap;
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::TagSet;
13use saluki_core::{
14 components::{
15 destinations::{Destination, DestinationBuilder, DestinationContext},
16 ComponentContext,
17 },
18 data_model::event::{metric::Metric, Event, EventType},
19};
20use saluki_error::{generic_error, GenericError};
21use serde::Deserialize;
22use stringtheory::MetaString;
23use tokio::select;
24use tracing::{debug, warn};
25use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
26use tracing_rolling_file::{RollingConditionBase, RollingFileAppenderBase};
27
28const DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE: ByteSize = ByteSize::mb(10);
29const DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS: usize = 3;
30const DEBUG_LOG_WRITER_BUFFER_LINES: usize = 4096;
31const DOGSTATSD_METRICS_STATS_ENABLE_KEY: &str = "dogstatsd_metrics_stats_enable";
32
33const fn default_true() -> bool {
34 true
35}
36
37const fn default_log_file_max_size() -> ByteSize {
38 DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE
39}
40
41const fn default_log_file_max_rolls() -> usize {
42 DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS
43}
44
45#[derive(Clone, Debug, Deserialize)]
47#[cfg_attr(test, derive(serde::Serialize))]
48pub struct DogStatsDDebugLogConfiguration {
49 #[serde(rename = "dogstatsd_metrics_stats_enable", default)]
56 metrics_stats_enabled: bool,
57
58 #[serde(skip)]
59 configuration: Option<GenericConfiguration>,
60
61 #[serde(rename = "dogstatsd_logging_enabled", default = "default_true")]
66 logging_enabled: bool,
67
68 #[serde(rename = "dogstatsd_log_file", default)]
72 log_file: PathBuf,
73
74 #[serde(rename = "dogstatsd_log_file_max_size", default = "default_log_file_max_size")]
78 log_file_max_size: ByteSize,
79
80 #[serde(rename = "dogstatsd_log_file_max_rolls", default = "default_log_file_max_rolls")]
84 log_file_max_rolls: usize,
85}
86
87#[cfg(test)]
88impl PartialEq for DogStatsDDebugLogConfiguration {
89 fn eq(&self, other: &Self) -> bool {
90 self.metrics_stats_enabled == other.metrics_stats_enabled
91 && self.logging_enabled == other.logging_enabled
92 && self.log_file == other.log_file
93 && self.log_file_max_size == other.log_file_max_size
94 && self.log_file_max_rolls == other.log_file_max_rolls
95 }
96}
97
98struct DogStatsDDebugLog {
100 log_file: PathBuf,
101 log_file_max_size: ByteSize,
102 log_file_max_rolls: usize,
103 writer: Option<DebugLogWriter>,
104 metrics_stats_enabled: bool,
105 configuration: GenericConfiguration,
106 stats: FastHashMap<ContextNoOrigin, MetricSample>,
107}
108
109struct DebugLogWriter {
110 writer: NonBlocking,
111 _guard: WorkerGuard,
112}
113
114#[derive(Debug, Default)]
115struct MetricSample {
116 count: u64,
117 last_seen: u64,
118}
119
120#[derive(Eq, Hash, PartialEq)]
121struct ContextNoOrigin {
122 name: MetaString,
123 tags: TagSet,
124}
125
126impl DogStatsDDebugLogConfiguration {
127 pub fn from_configuration(
131 config: &GenericConfiguration, default_log_file_path: PathBuf,
132 ) -> Result<Self, GenericError> {
133 let mut cfg: Self = config.as_typed()?;
134
135 if cfg.log_file.as_os_str().is_empty() {
136 cfg.log_file = default_log_file_path;
137 }
138
139 if cfg.log_file.to_str().is_none() {
140 return Err(generic_error!(
141 "dogstatsd_log_file must be valid UTF-8, got '{}'",
142 cfg.log_file.display()
143 ));
144 }
145
146 cfg.configuration = Some(config.clone());
147
148 Ok(cfg)
149 }
150
151 pub const fn enabled(&self) -> bool {
153 self.logging_enabled
154 }
155
156 pub fn log_file(&self) -> &Path {
158 &self.log_file
159 }
160
161 pub const fn log_file_max_size(&self) -> ByteSize {
163 self.log_file_max_size
164 }
165
166 pub const fn log_file_max_rolls(&self) -> usize {
168 self.log_file_max_rolls
169 }
170}
171
172impl DogStatsDDebugLog {
173 fn from_configuration(config: &DogStatsDDebugLogConfiguration) -> Result<Self, GenericError> {
174 let mut destination = Self {
175 log_file: config.log_file.clone(),
176 log_file_max_size: config.log_file_max_size,
177 log_file_max_rolls: config.log_file_max_rolls,
178 writer: None,
179 metrics_stats_enabled: config.metrics_stats_enabled,
180 configuration: config
181 .configuration
182 .clone()
183 .expect("configuration must be set via from_configuration"),
184 stats: FastHashMap::default(),
185 };
186
187 if destination.metrics_stats_enabled {
188 destination.ensure_writer()?;
189 }
190
191 Ok(destination)
192 }
193
194 fn process_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
195 if !self.metrics_stats_enabled {
196 return Ok(());
197 }
198
199 self.write_metric(metric)
200 }
201
202 fn write_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
203 self.ensure_writer()?;
204
205 let context = metric.context();
206 let metric_context = ContextNoOrigin {
207 name: context.name().clone(),
208 tags: context.tags().clone(),
209 };
210
211 let timestamp = saluki_common::time::get_coarse_unix_timestamp();
212 let sample = self.stats.entry(metric_context).or_default();
213 sample.count += 1;
214 sample.last_seen = timestamp;
215
216 let writer = self.writer.as_mut().expect("writer should be initialized");
217 writeln!(
218 writer.writer,
219 "Metric Name: {} | Tags: {{{}}} | Count: {} | Last Seen: {}",
220 context.name(),
221 format_tags(context.tags()),
222 sample.count,
223 format_timestamp(sample.last_seen)
224 )
225 .map_err(|e| {
226 generic_error!(
227 "Failed to write to DogStatsD debug log file '{}': {}",
228 self.log_file.display(),
229 e
230 )
231 })
232 }
233
234 fn ensure_writer(&mut self) -> Result<(), GenericError> {
235 if self.writer.is_some() {
236 return Ok(());
237 }
238
239 let appender = RollingFileAppenderBase::new(
240 &self.log_file,
241 RollingConditionBase::new().max_size(self.log_file_max_size.as_u64()),
242 self.log_file_max_rolls,
243 )
244 .map_err(|e| generic_error!("Failed to open dogstatsd_log_file '{}': {}", self.log_file.display(), e))?;
245
246 let (writer, guard) = NonBlockingBuilder::default()
247 .thread_name("dsd-dbg-writer")
248 .buffered_lines_limit(DEBUG_LOG_WRITER_BUFFER_LINES)
249 .lossy(true)
251 .finish(appender);
252
253 self.writer = Some(DebugLogWriter { writer, _guard: guard });
254
255 Ok(())
256 }
257}
258
259#[async_trait]
260impl Destination for DogStatsDDebugLog {
261 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
262 let mut health = context.take_health_handle();
263 health.mark_ready();
264
265 let mut metrics_stats_enabled_watcher =
266 self.configuration.watch_for_updates(DOGSTATSD_METRICS_STATS_ENABLE_KEY);
267
268 loop {
269 select! {
270 _ = health.live() => continue,
271 maybe_events = context.events().next() => match maybe_events {
272 Some(events) => {
273 for event in events {
274 if let Event::Metric(metric) = event {
275 if let Err(error) = self.process_metric(&metric) {
276 warn!(error = %error, "Failed to write DogStatsD debug log line; continuing.");
277 }
278 }
279 }
280 },
281 None => break,
282 },
283 (_, maybe_metrics_stats_enabled) = metrics_stats_enabled_watcher.changed::<bool>() => {
284 if let Some(metrics_stats_enabled) = maybe_metrics_stats_enabled {
285 self.metrics_stats_enabled = metrics_stats_enabled;
286 debug!(metrics_stats_enabled, "Updated DogStatsD metrics stats debug logging gate.");
287 }
288 },
289 }
290 }
291
292 Ok(())
293 }
294}
295
296#[async_trait]
297impl DestinationBuilder for DogStatsDDebugLogConfiguration {
298 fn input_event_type(&self) -> EventType {
299 EventType::Metric
300 }
301
302 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
303 DogStatsDDebugLog::from_configuration(self)
304 .map(|destination| Box::new(destination) as Box<dyn Destination + Send>)
305 }
306}
307
308impl MemoryBounds for DogStatsDDebugLogConfiguration {
309 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
310 builder
311 .minimum()
312 .with_single_value::<DogStatsDDebugLog>("component struct");
313 }
314}
315
316fn format_tags(tags: &TagSet) -> String {
317 let mut formatted = String::new();
318
319 for tag in tags {
320 if !formatted.is_empty() {
321 formatted.push(' ');
322 }
323 formatted.push_str(tag.as_str());
324 }
325
326 formatted
327}
328
329fn format_timestamp(timestamp: u64) -> String {
330 i64::try_from(timestamp)
331 .ok()
332 .and_then(|ts| DateTime::<Utc>::from_timestamp(ts, 0))
333 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S +0000 UTC").to_string())
334 .unwrap_or_else(|| timestamp.to_string())
335}
336
337#[cfg(test)]
338mod tests {
339 use std::{
340 fs,
341 path::{Path, PathBuf},
342 };
343
344 use bytesize::ByteSize;
345 use datadog_agent_config_testing::config_registry::structs;
346 use datadog_agent_config_testing::run_config_smoke_tests;
347 use saluki_context::Context;
348 use saluki_core::data_model::event::metric::Metric;
349 use serde_json::json;
350 use tempfile::tempdir;
351
352 use super::{DogStatsDDebugLog, DogStatsDDebugLogConfiguration};
353 use crate::config::{DatadogRemapper, KEY_ALIASES};
354
355 fn test_default_log_file_path() -> PathBuf {
356 PathBuf::from("/tmp/default-dogstatsd-stats.log")
357 }
358
359 async fn deser_config(raw_json: &str) -> DogStatsDDebugLogConfiguration {
360 let value = serde_json::from_str(raw_json).expect("test config should be valid JSON");
361 let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
362
363 DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
364 .expect("DogStatsDDebugLogConfiguration should deserialize")
365 }
366
367 #[tokio::test]
368 async fn defaults_match_core_agent() {
369 let config = deser_config("{}").await;
370
371 assert!(config.enabled());
372 assert!(!config.metrics_stats_enabled);
373 assert!(config.logging_enabled);
374 assert_eq!(config.log_file(), test_default_log_file_path());
375 assert_eq!(config.log_file_max_size(), ByteSize::mb(10));
376 assert_eq!(config.log_file_max_rolls(), 3);
377 }
378
379 #[tokio::test]
380 async fn logging_enabled_controls_topology_wiring() {
381 let config = deser_config(r#"{ "dogstatsd_metrics_stats_enable": true }"#).await;
382 assert!(config.enabled());
383 assert!(config.metrics_stats_enabled);
384
385 let config = deser_config(
386 r#"{
387 "dogstatsd_metrics_stats_enable": true,
388 "dogstatsd_logging_enabled": true
389 }"#,
390 )
391 .await;
392 assert!(config.enabled());
393 assert!(config.metrics_stats_enabled);
394
395 let config = deser_config(
396 r#"{
397 "dogstatsd_metrics_stats_enable": true,
398 "dogstatsd_logging_enabled": false
399 }"#,
400 )
401 .await;
402 assert!(!config.enabled());
403 assert!(config.metrics_stats_enabled);
404
405 let config = deser_config(
406 r#"{
407 "dogstatsd_metrics_stats_enable": false,
408 "dogstatsd_logging_enabled": true
409 }"#,
410 )
411 .await;
412 assert!(config.enabled());
413 assert!(!config.metrics_stats_enabled);
414 }
415
416 #[tokio::test]
417 async fn explicit_log_file_path_is_preserved() {
418 let config = deser_config(r#"{ "dogstatsd_log_file": "/tmp/dsd-debug.log" }"#).await;
419
420 assert_eq!(config.log_file(), std::path::Path::new("/tmp/dsd-debug.log"));
421 }
422
423 #[tokio::test]
424 async fn negative_log_file_max_rolls_is_rejected() {
425 let value = json!({ "dogstatsd_log_file_max_rolls": -1 });
426 let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
427
428 let result = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path());
429 assert!(result.is_err());
430 }
431
432 #[tokio::test]
433 async fn smoke_test() {
434 run_config_smoke_tests(
435 structs::DOGSTATSD_DEBUG_LOG_CONFIGURATION,
436 &[],
437 json!({}),
438 |cfg| {
439 DogStatsDDebugLogConfiguration::from_configuration(&cfg, test_default_log_file_path())
440 .expect("DogStatsDDebugLogConfiguration should deserialize")
441 },
442 KEY_ALIASES,
443 DatadogRemapper::new,
444 )
445 .await
446 }
447
448 async fn test_config(log_file: PathBuf, max_size: ByteSize, max_rolls: usize) -> DogStatsDDebugLogConfiguration {
449 let (config, _) = saluki_config::ConfigurationLoader::for_tests(
450 Some(json!({
451 "dogstatsd_metrics_stats_enable": true,
452 "dogstatsd_logging_enabled": true,
453 "dogstatsd_log_file": log_file.display().to_string(),
454 "dogstatsd_log_file_max_size": max_size.as_u64(),
455 "dogstatsd_log_file_max_rolls": max_rolls,
456 })),
457 None,
458 false,
459 )
460 .await;
461
462 DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
463 .expect("DogStatsDDebugLogConfiguration should deserialize")
464 }
465
466 fn read_log_files(log_file: &Path, max_rolls: usize) -> String {
467 let mut output = String::new();
468
469 for roll in (0..=max_rolls).rev() {
470 let path = rolled_path(log_file, roll);
471 if path.exists() {
472 output.push_str(&fs::read_to_string(&path).expect("debug log file should be readable"));
473 }
474 }
475
476 output
477 }
478
479 fn rolled_path(log_file: &Path, roll: usize) -> PathBuf {
480 if roll == 0 {
481 log_file.to_path_buf()
482 } else {
483 PathBuf::from(format!("{}.{}", log_file.display(), roll))
484 }
485 }
486
487 fn tagged_metric() -> Metric {
488 let context = Context::from_static_parts("custom.metric", &["env:prod", "service:web"]);
489 Metric::counter(context, 1.0)
490 }
491
492 #[tokio::test]
493 async fn writes_metric_debug_lines_and_updates_count() {
494 let tempdir = tempdir().expect("temporary directory should be created");
495 let log_file = tempdir.path().join("dogstatsd-stats.log");
496 let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
497 let metric = tagged_metric();
498
499 let mut destination =
500 DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
501 destination
502 .write_metric(&metric)
503 .expect("first metric should be written");
504 destination
505 .write_metric(&metric)
506 .expect("second metric should be written");
507 drop(destination);
508
509 let output = read_log_files(&log_file, config.log_file_max_rolls());
510 let lines = output.lines().collect::<Vec<_>>();
511
512 assert_eq!(lines.len(), 2);
513 assert!(lines[0].contains("Metric Name: custom.metric"));
514 assert!(lines[0].contains("Tags: {env:prod service:web}"));
515 assert!(lines[0].contains("Count: 1"));
516 assert!(lines[0].contains("Last Seen: "));
517 assert!(lines[1].contains("Count: 2"));
518 }
519
520 #[tokio::test]
521 async fn dynamically_drops_until_metrics_stats_are_enabled() {
522 use std::time::Duration;
523
524 use saluki_config::dynamic::ConfigUpdate;
525
526 let tempdir = tempdir().expect("temporary directory should be created");
527 let log_file = tempdir.path().join("dogstatsd-stats.log");
528 let (config, sender) = saluki_config::ConfigurationLoader::for_tests(
529 Some(json!({
530 "dogstatsd_log_file": log_file.display().to_string(),
531 "dogstatsd_log_file_max_size": "64kb",
532 "dogstatsd_log_file_max_rolls": 3,
533 "dogstatsd_logging_enabled": true
534 })),
535 None,
536 true,
537 )
538 .await;
539 let sender = sender.expect("dynamic sender should be present");
540 sender
541 .send(ConfigUpdate::Snapshot(json!({})))
542 .await
543 .expect("initial dynamic snapshot should be sent");
544 config.ready().await;
545
546 let dsd_config = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
547 .expect("DogStatsDDebugLogConfiguration should deserialize");
548 assert!(dsd_config.enabled());
549 assert!(!dsd_config.metrics_stats_enabled);
550
551 let metric = tagged_metric();
552 let mut destination =
553 DogStatsDDebugLog::from_configuration(&dsd_config).expect("debug log destination should be built");
554 let mut watcher = destination
555 .configuration
556 .watch_for_updates(super::DOGSTATSD_METRICS_STATS_ENABLE_KEY);
557 destination
558 .process_metric(&metric)
559 .expect("disabled metric should be dropped cleanly");
560 assert!(!log_file.exists());
561
562 sender
563 .send(ConfigUpdate::Partial {
564 key: "dogstatsd_metrics_stats_enable".to_string(),
565 value: json!(true),
566 })
567 .await
568 .expect("dynamic update should be sent");
569
570 let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
571 .await
572 .expect("metrics stats flag should receive enabled update");
573 destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
574
575 destination
576 .process_metric(&metric)
577 .expect("enabled metric should be written");
578
579 sender
580 .send(ConfigUpdate::Partial {
581 key: "dogstatsd_metrics_stats_enable".to_string(),
582 value: json!(false),
583 })
584 .await
585 .expect("dynamic update should be sent");
586
587 let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
588 .await
589 .expect("metrics stats flag should receive disabled update");
590 destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
591
592 destination
593 .process_metric(&metric)
594 .expect("disabled metric should be dropped cleanly");
595 drop(destination);
596
597 let output = read_log_files(&log_file, dsd_config.log_file_max_rolls());
598 let lines = output.lines().collect::<Vec<_>>();
599
600 assert_eq!(lines.len(), 1);
601 assert!(lines[0].contains("Metric Name: custom.metric"));
602 assert!(lines[0].contains("Count: 1"));
603 }
604
605 #[tokio::test]
606 async fn rotates_log_file_at_configured_size() {
607 let tempdir = tempdir().expect("temporary directory should be created");
608 let log_file = tempdir.path().join("dogstatsd-stats.log");
609 let min_debug_line_len =
610 "Metric Name: custom.metric | Tags: {env:prod service:web} | Count: 1 | Last Seen: ".len();
611 let config = test_config(log_file.clone(), ByteSize::b(min_debug_line_len as u64), 2).await;
612 let metric = tagged_metric();
613
614 let mut destination =
615 DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
616 for _ in 0..12 {
617 destination.write_metric(&metric).expect("metric should be written");
618 }
619 drop(destination);
620
621 assert!(log_file.exists());
622 assert!(rolled_path(&log_file, 1).exists());
623 assert!(rolled_path(&log_file, 2).exists());
624 assert!(!rolled_path(&log_file, 3).exists());
625
626 let output = read_log_files(&log_file, config.log_file_max_rolls());
627 assert!(output.contains("Metric Name: custom.metric"));
628 }
629
630 #[tokio::test]
631 async fn build_error_mentions_log_file_config_key_and_path() {
632 let tempdir = tempdir().expect("temporary directory should be created");
633 let blocked_parent = tempdir.path().join("not-a-directory");
634 fs::write(&blocked_parent, "not a directory").expect("blocking file should be written");
635 let log_file = blocked_parent.join("dogstatsd-stats.log");
636 let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
637
638 let err = match DogStatsDDebugLog::from_configuration(&config) {
639 Ok(_) => panic!("build should fail"),
640 Err(err) => err,
641 };
642 let err = err.to_string();
643
644 assert!(err.contains("dogstatsd_log_file"));
645 assert!(err.contains(&log_file.display().to_string()));
646 }
647}