Skip to main content

saluki_components/sources/dogstatsd/replay/
reader.rs

1//! Reader for Datadog DogStatsD capture files.
2//!
3//! Decodes a `.dog` or `.dog.zstd` capture file into the sequence of `UnixDogstatsdMsg` records it
4//! contains, plus the optional `TaggerState` trailer.
5
6use std::{fs, path::Path};
7
8use datadog_protos::agent::{TaggerState, UnixDogstatsdMsg};
9use prost::Message;
10use saluki_error::{generic_error, GenericError};
11
12use super::file_header::{file_version, valid_header, DATADOG_HEADER, MIN_NANO_VERSION, MIN_STATE_VERSION};
13
14const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
15const LENGTH_PREFIX_SIZE: usize = 4;
16
17/// Timestamp resolution recorded in a capture file.
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum TimestampResolution {
20    /// Timestamps are recorded in whole seconds (file version < 3).
21    Seconds,
22    /// Timestamps are recorded in nanoseconds (file version >= 3).
23    Nanoseconds,
24}
25
26/// Reads back a DogStatsD traffic capture file.
27#[derive(Debug)]
28pub struct TrafficCaptureReader {
29    contents: Vec<u8>,
30    version: u8,
31    offset: usize,
32}
33
34impl TrafficCaptureReader {
35    /// Opens a capture file at the given path.
36    ///
37    /// Detects zstd-compressed inputs by magic bytes and decompresses transparently. Validates the
38    /// Datadog capture header and parses the file version.
39    pub fn from_path(path: &Path) -> Result<Self, GenericError> {
40        let raw =
41            fs::read(path).map_err(|e| generic_error!("Failed to read capture file '{}': {}", path.display(), e))?;
42
43        let contents = if has_zstd_magic(&raw) {
44            zstd::stream::decode_all(raw.as_slice())
45                .map_err(|e| generic_error!("Failed to decompress capture file '{}': {}", path.display(), e))?
46        } else {
47            raw
48        };
49
50        if !valid_header(&contents) {
51            return Err(generic_error!(
52                "Capture file '{}' does not begin with a valid Datadog capture header.",
53                path.display()
54            ));
55        }
56
57        let version = file_version(&contents)?;
58
59        Ok(Self {
60            contents,
61            version,
62            offset: DATADOG_HEADER.len(),
63        })
64    }
65
66    /// Returns the capture file version.
67    pub fn version(&self) -> u8 {
68        self.version
69    }
70
71    /// Returns the timestamp resolution implied by the file version.
72    pub fn timestamp_resolution(&self) -> TimestampResolution {
73        if self.version < MIN_NANO_VERSION {
74            TimestampResolution::Seconds
75        } else {
76            TimestampResolution::Nanoseconds
77        }
78    }
79
80    /// Reads the next captured DogStatsD record from the file.
81    ///
82    /// Returns `Ok(None)` when the stream of records is exhausted: at EOF, at the four-byte state
83    /// separator that introduces the tagger trailer, or at a truncated record boundary.
84    pub fn read_next(&mut self) -> Result<Option<UnixDogstatsdMsg>, GenericError> {
85        if self.offset + LENGTH_PREFIX_SIZE > self.contents.len() {
86            return Ok(None);
87        }
88
89        let size_bytes = &self.contents[self.offset..self.offset + LENGTH_PREFIX_SIZE];
90        let size = u32::from_le_bytes(size_bytes.try_into().expect("length prefix is 4 bytes")) as usize;
91        self.offset += LENGTH_PREFIX_SIZE;
92
93        // The writer emits a zero-length prefix to mark the start of the tagger state trailer; treat
94        // that (and any size that would overrun the buffer) as the end of the record stream.
95        if size == 0 || self.offset + size > self.contents.len() {
96            return Ok(None);
97        }
98
99        let msg = UnixDogstatsdMsg::decode(&self.contents[self.offset..self.offset + size])
100            .map_err(|e| generic_error!("Failed to decode captured DogStatsD record: {}", e))?;
101        self.offset += size;
102
103        Ok(Some(msg))
104    }
105
106    /// Reads the tagger state trailer from the end of the capture file.
107    ///
108    /// Returns `Ok(None)` if the file version predates state support, or if the trailer is empty.
109    /// Does not modify the read offset, so it can be called independently of `read_next`.
110    pub fn read_state(&self) -> Result<Option<TaggerState>, GenericError> {
111        if self.version < MIN_STATE_VERSION {
112            return Ok(None);
113        }
114
115        let len = self.contents.len();
116        if len < LENGTH_PREFIX_SIZE {
117            return Ok(None);
118        }
119
120        let size_bytes = &self.contents[len - LENGTH_PREFIX_SIZE..len];
121        let size = u32::from_le_bytes(size_bytes.try_into().expect("length suffix is 4 bytes")) as usize;
122        if size == 0 {
123            return Ok(None);
124        }
125
126        if size + LENGTH_PREFIX_SIZE > len {
127            return Err(generic_error!(
128                "Tagger state trailer size ({}) exceeds capture file length ({}).",
129                size,
130                len
131            ));
132        }
133
134        let state_start = len - LENGTH_PREFIX_SIZE - size;
135        let state = TaggerState::decode(&self.contents[state_start..len - LENGTH_PREFIX_SIZE])
136            .map_err(|e| generic_error!("Failed to decode tagger state trailer: {}", e))?;
137        Ok(Some(state))
138    }
139}
140
141fn has_zstd_magic(buf: &[u8]) -> bool {
142    buf.len() >= ZSTD_MAGIC.len() && buf[..ZSTD_MAGIC.len()] == ZSTD_MAGIC
143}
144
145#[cfg(test)]
146mod tests {
147    use std::{
148        fs,
149        path::PathBuf,
150        sync::Arc,
151        thread,
152        time::{Duration, SystemTime, UNIX_EPOCH},
153    };
154
155    use saluki_common::collections::FastHashMap;
156    use saluki_context::{
157        origin::OriginTagCardinality,
158        tags::{SharedTagSet, Tag, TagSet},
159    };
160    use saluki_env::{
161        workload::{origin::ResolvedOrigin, EntityId},
162        WorkloadProvider,
163    };
164
165    use super::*;
166    use crate::sources::dogstatsd::replay::writer::{CaptureRecord, CaptureTargetDir, TrafficCaptureWriter};
167
168    #[derive(Default)]
169    struct MockWorkloadProvider {
170        entities: FastHashMap<EntityId, SharedTagSet>,
171    }
172
173    impl MockWorkloadProvider {
174        fn with_entity(entity_id: EntityId, tags: &[&str]) -> Self {
175            let mut entities = FastHashMap::default();
176            entities.insert(entity_id, shared_tags(tags));
177            Self { entities }
178        }
179    }
180
181    impl WorkloadProvider for MockWorkloadProvider {
182        fn get_tags_for_entity(
183            &self, entity_id: &EntityId, _cardinality: OriginTagCardinality,
184        ) -> Option<SharedTagSet> {
185            self.entities.get(entity_id).cloned()
186        }
187
188        fn get_resolved_origin(&self, _origin: saluki_context::origin::RawOrigin<'_>) -> Option<ResolvedOrigin> {
189            None
190        }
191    }
192
193    #[test]
194    fn plain_capture_round_trip() {
195        let (path, _dir_guard) = run_capture(1, false, &[sample_record(100, b"metric.a:1|c", 11)]);
196
197        let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
198        assert_eq!(reader.version(), 3);
199        assert_eq!(reader.timestamp_resolution(), TimestampResolution::Nanoseconds);
200
201        let msg = reader.read_next().expect("read should succeed").expect("first record");
202        assert_eq!(msg.timestamp, 100);
203        assert_eq!(msg.payload, b"metric.a:1|c");
204        assert_eq!(msg.payload_size, msg.payload.len() as i32);
205        assert_eq!(msg.pid, 11);
206
207        assert!(reader.read_next().expect("read should succeed").is_none());
208    }
209
210    #[test]
211    fn compressed_capture_round_trip() {
212        let (path, _dir_guard) = run_capture(
213            1,
214            true,
215            &[
216                sample_record(1, b"metric.a:1|c", 1),
217                sample_record(2, b"metric.b:2|c", 1),
218                sample_record(3, b"metric.c:3|c", 1),
219            ],
220        );
221
222        let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
223
224        for expected_ts in [1, 2, 3] {
225            let msg = reader.read_next().expect("read should succeed").expect("record");
226            assert_eq!(msg.timestamp, expected_ts);
227        }
228
229        assert!(reader.read_next().expect("read should succeed").is_none());
230    }
231
232    #[test]
233    fn read_next_stops_at_state_separator() {
234        let (path, _dir_guard) = run_capture(1, false, &[sample_record(7, b"x:1|c", 5)]);
235
236        let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
237        assert!(reader.read_next().expect("first record").is_some());
238
239        // Subsequent calls must yield None rather than attempting to decode the trailer as a record.
240        assert!(reader.read_next().expect("trailer boundary").is_none());
241        assert!(reader.read_next().expect("idempotent EOF").is_none());
242    }
243
244    #[test]
245    fn truncated_record_returns_none() {
246        let (path, _dir_guard) = run_capture(1, false, &[sample_record(1, b"metric.a:1|c", 1)]);
247
248        let bytes = fs::read(&path).expect("capture readable");
249        let truncated_path = path.with_extension("truncated");
250        // Drop the last 8 bytes so the trailer length prefix is gone and a record is incomplete.
251        fs::write(&truncated_path, &bytes[..bytes.len().saturating_sub(8)]).expect("write truncated");
252
253        let mut reader = TrafficCaptureReader::from_path(&truncated_path).expect("reader should open");
254        let _ = reader.read_next();
255        // Whatever the reader recovered, the next call must terminate cleanly rather than error.
256        assert!(reader.read_next().expect("clean EOF on truncation").is_none());
257    }
258
259    #[test]
260    fn bad_header_is_rejected() {
261        let tmp = unique_path("reader-bad-header");
262        fs::write(&tmp, b"this is not a capture file").expect("write garbage");
263
264        let err = TrafficCaptureReader::from_path(&tmp).expect_err("bad header should fail");
265        assert!(err.to_string().contains("Datadog capture header"));
266
267        let _ = fs::remove_file(&tmp);
268    }
269
270    #[test]
271    fn read_state_recovers_entity_tags() {
272        let target_dir = unique_dir("reader-state");
273        let workload = Arc::new(MockWorkloadProvider::with_entity(
274            EntityId::Container("container-xyz".into()),
275            &["env:prod", "service:api"],
276        ));
277        let writer = TrafficCaptureWriter::with_workload_provider(1, Some(workload));
278
279        let path = writer
280            .start_capture(
281                CaptureTargetDir::Explicit(target_dir.clone()),
282                Duration::from_millis(250),
283                false,
284            )
285            .expect("capture should start");
286        assert!(writer.enqueue(CaptureRecord {
287            timestamp_ns: 1,
288            payload: b"metric:1|c".to_vec(),
289            pid: Some(99),
290            ancillary: Vec::new(),
291            container_id: Some("container_id://container-xyz".to_string()),
292        }));
293        writer.stop_capture();
294        wait_until_inactive(&writer);
295
296        let reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
297        let state = reader
298            .read_state()
299            .expect("state should decode")
300            .expect("state present");
301        let entity = state
302            .state
303            .get("container_id://container-xyz")
304            .expect("captured entity present");
305        assert_eq!(
306            entity.low_cardinality_tags,
307            vec!["env:prod".to_string(), "service:api".to_string()]
308        );
309        assert_eq!(
310            state.pid_map.get(&99).map(String::as_str),
311            Some("container_id://container-xyz")
312        );
313
314        let _ = fs::remove_dir_all(target_dir);
315    }
316
317    fn run_capture(queue_depth: usize, compressed: bool, records: &[CaptureRecord]) -> (PathBuf, DirGuard) {
318        let target_dir = unique_dir("reader-capture");
319        let writer = TrafficCaptureWriter::new(queue_depth);
320
321        let path = writer
322            .start_capture(
323                CaptureTargetDir::Explicit(target_dir.clone()),
324                Duration::from_millis(500),
325                compressed,
326            )
327            .expect("capture should start");
328
329        for record in records {
330            assert!(writer.enqueue(clone_record(record)));
331        }
332        writer.stop_capture();
333        wait_until_inactive(&writer);
334
335        (path, DirGuard { path: target_dir })
336    }
337
338    fn clone_record(record: &CaptureRecord) -> CaptureRecord {
339        CaptureRecord {
340            timestamp_ns: record.timestamp_ns,
341            payload: record.payload.clone(),
342            pid: record.pid,
343            ancillary: record.ancillary.clone(),
344            container_id: record.container_id.clone(),
345        }
346    }
347
348    fn sample_record(timestamp_ns: i64, payload: &[u8], pid: i32) -> CaptureRecord {
349        CaptureRecord {
350            timestamp_ns,
351            payload: payload.to_vec(),
352            pid: Some(pid),
353            ancillary: Vec::new(),
354            container_id: Some(format!("container_id://container-{}", pid)),
355        }
356    }
357
358    fn shared_tags(tags: &[&str]) -> SharedTagSet {
359        TagSet::from_iter(tags.iter().copied().map(Tag::from)).into_shared()
360    }
361
362    fn wait_until_inactive(writer: &TrafficCaptureWriter) {
363        let deadline = std::time::Instant::now() + Duration::from_secs(2);
364        while writer.is_ongoing() && std::time::Instant::now() < deadline {
365            thread::sleep(Duration::from_millis(10));
366        }
367        assert!(!writer.is_ongoing(), "capture writer did not stop in time");
368    }
369
370    fn unique_dir(label: &str) -> PathBuf {
371        let path = unique_path(label);
372        fs::create_dir_all(&path).expect("test directory should be created");
373        path
374    }
375
376    fn unique_path(label: &str) -> PathBuf {
377        let timestamp = SystemTime::now()
378            .duration_since(UNIX_EPOCH)
379            .expect("clock should be after epoch")
380            .as_nanos();
381        std::env::temp_dir().join(format!("saluki-{}-{}-{}", label, std::process::id(), timestamp))
382    }
383
384    struct DirGuard {
385        path: PathBuf,
386    }
387
388    impl Drop for DirGuard {
389        fn drop(&mut self) {
390            let _ = fs::remove_dir_all(&self.path);
391        }
392    }
393}