1use std::{
2 io::Write,
3 path::{Path, PathBuf},
4};
5
6use async_trait::async_trait;
7use bytesize::ByteSize;
8use chrono::{DateTime, Utc};
9use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
10use saluki_common::collections::FastHashMap;
11use saluki_config::GenericConfiguration;
12use saluki_context::tags::TagSet;
13use saluki_core::{
14 components::{
15 destinations::{Destination, DestinationBuilder, DestinationContext},
16 ComponentContext,
17 },
18 data_model::event::{metric::Metric, Event, EventType},
19};
20use saluki_error::{generic_error, GenericError};
21use serde::Deserialize;
22use stringtheory::MetaString;
23use tokio::select;
24use tracing::{debug, warn};
25use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
26use tracing_rolling_file::{RollingConditionBase, RollingFileAppenderBase};
27
28const DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE: ByteSize = ByteSize::mb(10);
29const DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS: usize = 3;
30const DEBUG_LOG_WRITER_BUFFER_LINES: usize = 4096;
31const DOGSTATSD_METRICS_STATS_ENABLE_KEY: &str = "dogstatsd_metrics_stats_enable";
32
33const fn default_true() -> bool {
34 true
35}
36
37const fn default_log_file_max_size() -> ByteSize {
38 DEFAULT_DOGSTATSD_LOG_FILE_MAX_SIZE
39}
40
41const fn default_log_file_max_rolls() -> usize {
42 DEFAULT_DOGSTATSD_LOG_FILE_MAX_ROLLS
43}
44
45#[derive(Clone, Debug, Deserialize)]
47#[cfg_attr(test, derive(serde::Serialize))]
48pub struct DogStatsDDebugLogConfiguration {
49 #[serde(rename = "dogstatsd_metrics_stats_enable", default)]
56 metrics_stats_enabled: bool,
57
58 #[serde(skip)]
59 configuration: Option<GenericConfiguration>,
60
61 #[serde(rename = "dogstatsd_logging_enabled", default = "default_true")]
66 logging_enabled: bool,
67
68 #[serde(rename = "dogstatsd_log_file", default)]
72 log_file: PathBuf,
73
74 #[serde(rename = "dogstatsd_log_file_max_size", default = "default_log_file_max_size")]
78 log_file_max_size: ByteSize,
79
80 #[serde(rename = "dogstatsd_log_file_max_rolls", default = "default_log_file_max_rolls")]
84 log_file_max_rolls: usize,
85}
86
87#[cfg(test)]
88impl PartialEq for DogStatsDDebugLogConfiguration {
89 fn eq(&self, other: &Self) -> bool {
90 self.metrics_stats_enabled == other.metrics_stats_enabled
91 && self.logging_enabled == other.logging_enabled
92 && self.log_file == other.log_file
93 && self.log_file_max_size == other.log_file_max_size
94 && self.log_file_max_rolls == other.log_file_max_rolls
95 }
96}
97
98struct DogStatsDDebugLog {
100 log_file: PathBuf,
101 log_file_max_size: ByteSize,
102 log_file_max_rolls: usize,
103 writer: Option<DebugLogWriter>,
104 metrics_stats_enabled: bool,
105 configuration: GenericConfiguration,
106 stats: FastHashMap<ContextNoOrigin, MetricSample>,
107}
108
109struct DebugLogWriter {
110 writer: NonBlocking,
111 _guard: WorkerGuard,
112}
113
114#[derive(Debug, Default)]
115struct MetricSample {
116 count: u64,
117 last_seen: u64,
118}
119
120#[derive(Eq, Hash, PartialEq)]
121struct ContextNoOrigin {
122 name: MetaString,
123 tags: TagSet,
124}
125
126impl DogStatsDDebugLogConfiguration {
127 pub fn from_configuration(
131 config: &GenericConfiguration, default_log_file_path: PathBuf,
132 ) -> Result<Self, GenericError> {
133 let mut cfg: Self = config.as_typed()?;
134
135 if cfg.log_file.as_os_str().is_empty() {
136 cfg.log_file = default_log_file_path;
137 }
138
139 if cfg.log_file.to_str().is_none() {
140 return Err(generic_error!(
141 "dogstatsd_log_file must be valid UTF-8, got '{}'",
142 cfg.log_file.display()
143 ));
144 }
145
146 cfg.configuration = Some(config.clone());
147
148 Ok(cfg)
149 }
150
151 pub const fn enabled(&self) -> bool {
153 self.logging_enabled
154 }
155
156 pub fn log_file(&self) -> &Path {
158 &self.log_file
159 }
160
161 pub const fn log_file_max_size(&self) -> ByteSize {
163 self.log_file_max_size
164 }
165
166 pub const fn log_file_max_rolls(&self) -> usize {
168 self.log_file_max_rolls
169 }
170}
171
172impl DogStatsDDebugLog {
173 fn from_configuration(config: &DogStatsDDebugLogConfiguration) -> Result<Self, GenericError> {
174 let mut destination = Self {
175 log_file: config.log_file.clone(),
176 log_file_max_size: config.log_file_max_size,
177 log_file_max_rolls: config.log_file_max_rolls,
178 writer: None,
179 metrics_stats_enabled: config.metrics_stats_enabled,
180 configuration: config
181 .configuration
182 .clone()
183 .expect("configuration must be set via from_configuration"),
184 stats: FastHashMap::default(),
185 };
186
187 if destination.metrics_stats_enabled {
188 destination.ensure_writer()?;
189 }
190
191 Ok(destination)
192 }
193
194 fn process_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
195 if !self.metrics_stats_enabled {
196 return Ok(());
197 }
198
199 self.write_metric(metric)
200 }
201
202 fn write_metric(&mut self, metric: &Metric) -> Result<(), GenericError> {
203 self.ensure_writer()?;
204
205 let context = metric.context();
206 let metric_context = ContextNoOrigin {
207 name: context.name().clone(),
208 tags: context.tags().clone(),
209 };
210
211 let timestamp = saluki_common::time::get_coarse_unix_timestamp();
212 let sample = self.stats.entry(metric_context).or_default();
213 sample.count += 1;
214 sample.last_seen = timestamp;
215
216 let writer = self.writer.as_mut().expect("writer should be initialized");
217 writeln!(
218 writer.writer,
219 "Metric Name: {} | Tags: {{{}}} | Count: {} | Last Seen: {}",
220 context.name(),
221 format_tags(context.tags()),
222 sample.count,
223 format_timestamp(sample.last_seen)
224 )
225 .map_err(|e| {
226 generic_error!(
227 "Failed to write to DogStatsD debug log file '{}': {}",
228 self.log_file.display(),
229 e
230 )
231 })
232 }
233
234 fn ensure_writer(&mut self) -> Result<(), GenericError> {
235 if self.writer.is_some() {
236 return Ok(());
237 }
238
239 let appender = RollingFileAppenderBase::new(
240 &self.log_file,
241 RollingConditionBase::new().max_size(self.log_file_max_size.as_u64()),
242 self.log_file_max_rolls,
243 )
244 .map_err(|e| generic_error!("Failed to open dogstatsd_log_file '{}': {}", self.log_file.display(), e))?;
245
246 let (writer, guard) = NonBlockingBuilder::default()
247 .thread_name("dsd-dbg-writer")
248 .buffered_lines_limit(DEBUG_LOG_WRITER_BUFFER_LINES)
249 .lossy(true)
251 .finish(appender);
252
253 self.writer = Some(DebugLogWriter { writer, _guard: guard });
254
255 Ok(())
256 }
257}
258
259#[async_trait]
260impl Destination for DogStatsDDebugLog {
261 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
262 let mut health = context.take_health_handle();
263 health.mark_ready();
264
265 let mut metrics_stats_enabled_watcher =
266 self.configuration.watch_for_updates(DOGSTATSD_METRICS_STATS_ENABLE_KEY);
267
268 loop {
269 select! {
270 _ = health.live() => continue,
271 maybe_events = context.events().next() => match maybe_events {
272 Some(events) => {
273 for event in events {
274 if let Event::Metric(metric) = event {
275 if let Err(error) = self.process_metric(&metric) {
276 warn!(error = %error, "Failed to write DogStatsD debug log line; continuing.");
277 }
278 }
279 }
280 },
281 None => break,
282 },
283 (_, maybe_metrics_stats_enabled) = metrics_stats_enabled_watcher.changed::<bool>() => {
284 if let Some(metrics_stats_enabled) = maybe_metrics_stats_enabled {
285 self.metrics_stats_enabled = metrics_stats_enabled;
286 debug!(metrics_stats_enabled, "Updated DogStatsD metrics stats debug logging gate.");
287 }
288 },
289 }
290 }
291
292 Ok(())
293 }
294}
295
296#[async_trait]
297impl DestinationBuilder for DogStatsDDebugLogConfiguration {
298 fn input_event_type(&self) -> EventType {
299 EventType::Metric
300 }
301
302 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
303 DogStatsDDebugLog::from_configuration(self)
304 .map(|destination| Box::new(destination) as Box<dyn Destination + Send>)
305 }
306}
307
308impl MemoryBounds for DogStatsDDebugLogConfiguration {
309 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
310 builder
311 .minimum()
312 .with_single_value::<DogStatsDDebugLog>("component struct");
313 }
314}
315
316fn format_tags(tags: &TagSet) -> String {
317 let mut formatted = String::new();
318
319 for tag in tags {
320 if !formatted.is_empty() {
321 formatted.push(' ');
322 }
323 formatted.push_str(tag.as_str());
324 }
325
326 formatted
327}
328
329fn format_timestamp(timestamp: u64) -> String {
330 i64::try_from(timestamp)
331 .ok()
332 .and_then(|ts| DateTime::<Utc>::from_timestamp(ts, 0))
333 .map(|dt| dt.format("%Y-%m-%d %H:%M:%S +0000 UTC").to_string())
334 .unwrap_or_else(|| timestamp.to_string())
335}
336
337#[cfg(test)]
338mod tests {
339 use std::{
340 fs,
341 path::{Path, PathBuf},
342 };
343
344 use bytesize::ByteSize;
345 use saluki_context::Context;
346 use saluki_core::data_model::event::metric::Metric;
347 use serde_json::json;
348 use tempfile::tempdir;
349
350 use super::{DogStatsDDebugLog, DogStatsDDebugLogConfiguration};
351 use crate::config_registry::structs;
352 use crate::config_registry::test_support::run_config_smoke_tests;
353
354 fn test_default_log_file_path() -> PathBuf {
355 PathBuf::from("/tmp/default-dogstatsd-stats.log")
356 }
357
358 async fn deser_config(raw_json: &str) -> DogStatsDDebugLogConfiguration {
359 let value = serde_json::from_str(raw_json).expect("test config should be valid JSON");
360 let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
361
362 DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
363 .expect("DogStatsDDebugLogConfiguration should deserialize")
364 }
365
366 #[tokio::test]
367 async fn defaults_match_core_agent() {
368 let config = deser_config("{}").await;
369
370 assert!(config.enabled());
371 assert!(!config.metrics_stats_enabled);
372 assert!(config.logging_enabled);
373 assert_eq!(config.log_file(), test_default_log_file_path());
374 assert_eq!(config.log_file_max_size(), ByteSize::mb(10));
375 assert_eq!(config.log_file_max_rolls(), 3);
376 }
377
378 #[tokio::test]
379 async fn logging_enabled_controls_topology_wiring() {
380 let config = deser_config(r#"{ "dogstatsd_metrics_stats_enable": true }"#).await;
381 assert!(config.enabled());
382 assert!(config.metrics_stats_enabled);
383
384 let config = deser_config(
385 r#"{
386 "dogstatsd_metrics_stats_enable": true,
387 "dogstatsd_logging_enabled": true
388 }"#,
389 )
390 .await;
391 assert!(config.enabled());
392 assert!(config.metrics_stats_enabled);
393
394 let config = deser_config(
395 r#"{
396 "dogstatsd_metrics_stats_enable": true,
397 "dogstatsd_logging_enabled": false
398 }"#,
399 )
400 .await;
401 assert!(!config.enabled());
402 assert!(config.metrics_stats_enabled);
403
404 let config = deser_config(
405 r#"{
406 "dogstatsd_metrics_stats_enable": false,
407 "dogstatsd_logging_enabled": true
408 }"#,
409 )
410 .await;
411 assert!(config.enabled());
412 assert!(!config.metrics_stats_enabled);
413 }
414
415 #[tokio::test]
416 async fn explicit_log_file_path_is_preserved() {
417 let config = deser_config(r#"{ "dogstatsd_log_file": "/tmp/dsd-debug.log" }"#).await;
418
419 assert_eq!(config.log_file(), std::path::Path::new("/tmp/dsd-debug.log"));
420 }
421
422 #[tokio::test]
423 async fn negative_log_file_max_rolls_is_rejected() {
424 let value = json!({ "dogstatsd_log_file_max_rolls": -1 });
425 let (config, _) = saluki_config::ConfigurationLoader::for_tests(Some(value), None, false).await;
426
427 let result = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path());
428 assert!(result.is_err());
429 }
430
431 #[tokio::test]
432 async fn smoke_test() {
433 run_config_smoke_tests(structs::DOGSTATSD_DEBUG_LOG_CONFIGURATION, &[], json!({}), |cfg| {
434 DogStatsDDebugLogConfiguration::from_configuration(&cfg, test_default_log_file_path())
435 .expect("DogStatsDDebugLogConfiguration should deserialize")
436 })
437 .await
438 }
439
440 async fn test_config(log_file: PathBuf, max_size: ByteSize, max_rolls: usize) -> DogStatsDDebugLogConfiguration {
441 let (config, _) = saluki_config::ConfigurationLoader::for_tests(
442 Some(json!({
443 "dogstatsd_metrics_stats_enable": true,
444 "dogstatsd_logging_enabled": true,
445 "dogstatsd_log_file": log_file.display().to_string(),
446 "dogstatsd_log_file_max_size": max_size.as_u64(),
447 "dogstatsd_log_file_max_rolls": max_rolls,
448 })),
449 None,
450 false,
451 )
452 .await;
453
454 DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
455 .expect("DogStatsDDebugLogConfiguration should deserialize")
456 }
457
458 fn read_log_files(log_file: &Path, max_rolls: usize) -> String {
459 let mut output = String::new();
460
461 for roll in (0..=max_rolls).rev() {
462 let path = rolled_path(log_file, roll);
463 if path.exists() {
464 output.push_str(&fs::read_to_string(&path).expect("debug log file should be readable"));
465 }
466 }
467
468 output
469 }
470
471 fn rolled_path(log_file: &Path, roll: usize) -> PathBuf {
472 if roll == 0 {
473 log_file.to_path_buf()
474 } else {
475 PathBuf::from(format!("{}.{}", log_file.display(), roll))
476 }
477 }
478
479 fn tagged_metric() -> Metric {
480 let context = Context::from_static_parts("custom.metric", &["env:prod", "service:web"]);
481 Metric::counter(context, 1.0)
482 }
483
484 #[tokio::test]
485 async fn writes_metric_debug_lines_and_updates_count() {
486 let tempdir = tempdir().expect("temporary directory should be created");
487 let log_file = tempdir.path().join("dogstatsd-stats.log");
488 let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
489 let metric = tagged_metric();
490
491 let mut destination =
492 DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
493 destination
494 .write_metric(&metric)
495 .expect("first metric should be written");
496 destination
497 .write_metric(&metric)
498 .expect("second metric should be written");
499 drop(destination);
500
501 let output = read_log_files(&log_file, config.log_file_max_rolls());
502 let lines = output.lines().collect::<Vec<_>>();
503
504 assert_eq!(lines.len(), 2);
505 assert!(lines[0].contains("Metric Name: custom.metric"));
506 assert!(lines[0].contains("Tags: {env:prod service:web}"));
507 assert!(lines[0].contains("Count: 1"));
508 assert!(lines[0].contains("Last Seen: "));
509 assert!(lines[1].contains("Count: 2"));
510 }
511
512 #[tokio::test]
513 async fn dynamically_drops_until_metrics_stats_are_enabled() {
514 use std::time::Duration;
515
516 use saluki_config::dynamic::ConfigUpdate;
517
518 let tempdir = tempdir().expect("temporary directory should be created");
519 let log_file = tempdir.path().join("dogstatsd-stats.log");
520 let (config, sender) = saluki_config::ConfigurationLoader::for_tests(
521 Some(json!({
522 "dogstatsd_log_file": log_file.display().to_string(),
523 "dogstatsd_log_file_max_size": "64kb",
524 "dogstatsd_log_file_max_rolls": 3,
525 "dogstatsd_logging_enabled": true
526 })),
527 None,
528 true,
529 )
530 .await;
531 let sender = sender.expect("dynamic sender should be present");
532 sender
533 .send(ConfigUpdate::Snapshot(json!({})))
534 .await
535 .expect("initial dynamic snapshot should be sent");
536 config.ready().await;
537
538 let dsd_config = DogStatsDDebugLogConfiguration::from_configuration(&config, test_default_log_file_path())
539 .expect("DogStatsDDebugLogConfiguration should deserialize");
540 assert!(dsd_config.enabled());
541 assert!(!dsd_config.metrics_stats_enabled);
542
543 let metric = tagged_metric();
544 let mut destination =
545 DogStatsDDebugLog::from_configuration(&dsd_config).expect("debug log destination should be built");
546 let mut watcher = destination
547 .configuration
548 .watch_for_updates(super::DOGSTATSD_METRICS_STATS_ENABLE_KEY);
549 destination
550 .process_metric(&metric)
551 .expect("disabled metric should be dropped cleanly");
552 assert!(!log_file.exists());
553
554 sender
555 .send(ConfigUpdate::Partial {
556 key: "dogstatsd_metrics_stats_enable".to_string(),
557 value: json!(true),
558 })
559 .await
560 .expect("dynamic update should be sent");
561
562 let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
563 .await
564 .expect("metrics stats flag should receive enabled update");
565 destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
566
567 destination
568 .process_metric(&metric)
569 .expect("enabled metric should be written");
570
571 sender
572 .send(ConfigUpdate::Partial {
573 key: "dogstatsd_metrics_stats_enable".to_string(),
574 value: json!(false),
575 })
576 .await
577 .expect("dynamic update should be sent");
578
579 let (_, maybe_enabled) = tokio::time::timeout(Duration::from_secs(2), watcher.changed::<bool>())
580 .await
581 .expect("metrics stats flag should receive disabled update");
582 destination.metrics_stats_enabled = maybe_enabled.expect("metrics stats update should have a new value");
583
584 destination
585 .process_metric(&metric)
586 .expect("disabled metric should be dropped cleanly");
587 drop(destination);
588
589 let output = read_log_files(&log_file, dsd_config.log_file_max_rolls());
590 let lines = output.lines().collect::<Vec<_>>();
591
592 assert_eq!(lines.len(), 1);
593 assert!(lines[0].contains("Metric Name: custom.metric"));
594 assert!(lines[0].contains("Count: 1"));
595 }
596
597 #[tokio::test]
598 async fn rotates_log_file_at_configured_size() {
599 let tempdir = tempdir().expect("temporary directory should be created");
600 let log_file = tempdir.path().join("dogstatsd-stats.log");
601 let min_debug_line_len =
602 "Metric Name: custom.metric | Tags: {env:prod service:web} | Count: 1 | Last Seen: ".len();
603 let config = test_config(log_file.clone(), ByteSize::b(min_debug_line_len as u64), 2).await;
604 let metric = tagged_metric();
605
606 let mut destination =
607 DogStatsDDebugLog::from_configuration(&config).expect("debug log destination should be built");
608 for _ in 0..12 {
609 destination.write_metric(&metric).expect("metric should be written");
610 }
611 drop(destination);
612
613 assert!(log_file.exists());
614 assert!(rolled_path(&log_file, 1).exists());
615 assert!(rolled_path(&log_file, 2).exists());
616 assert!(!rolled_path(&log_file, 3).exists());
617
618 let output = read_log_files(&log_file, config.log_file_max_rolls());
619 assert!(output.contains("Metric Name: custom.metric"));
620 }
621
622 #[tokio::test]
623 async fn build_error_mentions_log_file_config_key_and_path() {
624 let tempdir = tempdir().expect("temporary directory should be created");
625 let blocked_parent = tempdir.path().join("not-a-directory");
626 fs::write(&blocked_parent, "not a directory").expect("blocking file should be written");
627 let log_file = blocked_parent.join("dogstatsd-stats.log");
628 let config = test_config(log_file.clone(), ByteSize::kb(64), 3).await;
629
630 let err = match DogStatsDDebugLog::from_configuration(&config) {
631 Ok(_) => panic!("build should fail"),
632 Err(err) => err,
633 };
634 let err = err.to_string();
635
636 assert!(err.contains("dogstatsd_log_file"));
637 assert!(err.contains(&log_file.display().to_string()));
638 }
639}