saluki_components/sources/dogstatsd/replay/
reader.rs1use std::{fs, path::Path};
7
8use datadog_protos::agent::{TaggerState, UnixDogstatsdMsg};
9use prost::Message;
10use saluki_error::{generic_error, GenericError};
11
12use super::file_header::{file_version, valid_header, DATADOG_HEADER, MIN_NANO_VERSION, MIN_STATE_VERSION};
13
14const ZSTD_MAGIC: [u8; 4] = [0x28, 0xB5, 0x2F, 0xFD];
15const LENGTH_PREFIX_SIZE: usize = 4;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum TimestampResolution {
20 Seconds,
22 Nanoseconds,
24}
25
26#[derive(Debug)]
28pub struct TrafficCaptureReader {
29 contents: Vec<u8>,
30 version: u8,
31 offset: usize,
32}
33
34impl TrafficCaptureReader {
35 pub fn from_path(path: &Path) -> Result<Self, GenericError> {
40 let raw =
41 fs::read(path).map_err(|e| generic_error!("Failed to read capture file '{}': {}", path.display(), e))?;
42
43 let contents = if has_zstd_magic(&raw) {
44 zstd::stream::decode_all(raw.as_slice())
45 .map_err(|e| generic_error!("Failed to decompress capture file '{}': {}", path.display(), e))?
46 } else {
47 raw
48 };
49
50 if !valid_header(&contents) {
51 return Err(generic_error!(
52 "Capture file '{}' does not begin with a valid Datadog capture header.",
53 path.display()
54 ));
55 }
56
57 let version = file_version(&contents)?;
58
59 Ok(Self {
60 contents,
61 version,
62 offset: DATADOG_HEADER.len(),
63 })
64 }
65
66 pub fn version(&self) -> u8 {
68 self.version
69 }
70
71 pub fn timestamp_resolution(&self) -> TimestampResolution {
73 if self.version < MIN_NANO_VERSION {
74 TimestampResolution::Seconds
75 } else {
76 TimestampResolution::Nanoseconds
77 }
78 }
79
80 pub fn read_next(&mut self) -> Result<Option<UnixDogstatsdMsg>, GenericError> {
85 if self.offset + LENGTH_PREFIX_SIZE > self.contents.len() {
86 return Ok(None);
87 }
88
89 let size_bytes = &self.contents[self.offset..self.offset + LENGTH_PREFIX_SIZE];
90 let size = u32::from_le_bytes(size_bytes.try_into().expect("length prefix is 4 bytes")) as usize;
91 self.offset += LENGTH_PREFIX_SIZE;
92
93 if size == 0 || self.offset + size > self.contents.len() {
96 return Ok(None);
97 }
98
99 let msg = UnixDogstatsdMsg::decode(&self.contents[self.offset..self.offset + size])
100 .map_err(|e| generic_error!("Failed to decode captured DogStatsD record: {}", e))?;
101 self.offset += size;
102
103 Ok(Some(msg))
104 }
105
106 pub fn read_state(&self) -> Result<Option<TaggerState>, GenericError> {
111 if self.version < MIN_STATE_VERSION {
112 return Ok(None);
113 }
114
115 let len = self.contents.len();
116 if len < LENGTH_PREFIX_SIZE {
117 return Ok(None);
118 }
119
120 let size_bytes = &self.contents[len - LENGTH_PREFIX_SIZE..len];
121 let size = u32::from_le_bytes(size_bytes.try_into().expect("length suffix is 4 bytes")) as usize;
122 if size == 0 {
123 return Ok(None);
124 }
125
126 if size + LENGTH_PREFIX_SIZE > len {
127 return Err(generic_error!(
128 "Tagger state trailer size ({}) exceeds capture file length ({}).",
129 size,
130 len
131 ));
132 }
133
134 let state_start = len - LENGTH_PREFIX_SIZE - size;
135 let state = TaggerState::decode(&self.contents[state_start..len - LENGTH_PREFIX_SIZE])
136 .map_err(|e| generic_error!("Failed to decode tagger state trailer: {}", e))?;
137 Ok(Some(state))
138 }
139}
140
141fn has_zstd_magic(buf: &[u8]) -> bool {
142 buf.len() >= ZSTD_MAGIC.len() && buf[..ZSTD_MAGIC.len()] == ZSTD_MAGIC
143}
144
145#[cfg(test)]
146mod tests {
147 use std::{
148 fs,
149 path::PathBuf,
150 sync::Arc,
151 thread,
152 time::{Duration, SystemTime, UNIX_EPOCH},
153 };
154
155 use saluki_common::collections::FastHashMap;
156 use saluki_context::{
157 origin::OriginTagCardinality,
158 tags::{SharedTagSet, Tag, TagSet},
159 };
160 use saluki_env::{
161 workload::{origin::ResolvedOrigin, EntityId},
162 WorkloadProvider,
163 };
164
165 use super::*;
166 use crate::sources::dogstatsd::replay::writer::{CaptureRecord, CaptureTargetDir, TrafficCaptureWriter};
167
168 #[derive(Default)]
169 struct MockWorkloadProvider {
170 entities: FastHashMap<EntityId, SharedTagSet>,
171 }
172
173 impl MockWorkloadProvider {
174 fn with_entity(entity_id: EntityId, tags: &[&str]) -> Self {
175 let mut entities = FastHashMap::default();
176 entities.insert(entity_id, shared_tags(tags));
177 Self { entities }
178 }
179 }
180
181 impl WorkloadProvider for MockWorkloadProvider {
182 fn get_tags_for_entity(
183 &self, entity_id: &EntityId, _cardinality: OriginTagCardinality,
184 ) -> Option<SharedTagSet> {
185 self.entities.get(entity_id).cloned()
186 }
187
188 fn get_resolved_origin(&self, _origin: saluki_context::origin::RawOrigin<'_>) -> Option<ResolvedOrigin> {
189 None
190 }
191 }
192
193 #[test]
194 fn plain_capture_round_trip() {
195 let (path, _dir_guard) = run_capture(1, false, &[sample_record(100, b"metric.a:1|c", 11)]);
196
197 let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
198 assert_eq!(reader.version(), 3);
199 assert_eq!(reader.timestamp_resolution(), TimestampResolution::Nanoseconds);
200
201 let msg = reader.read_next().expect("read should succeed").expect("first record");
202 assert_eq!(msg.timestamp, 100);
203 assert_eq!(msg.payload, b"metric.a:1|c");
204 assert_eq!(msg.payload_size, msg.payload.len() as i32);
205 assert_eq!(msg.pid, 11);
206
207 assert!(reader.read_next().expect("read should succeed").is_none());
208 }
209
210 #[test]
211 fn compressed_capture_round_trip() {
212 let (path, _dir_guard) = run_capture(
213 1,
214 true,
215 &[
216 sample_record(1, b"metric.a:1|c", 1),
217 sample_record(2, b"metric.b:2|c", 1),
218 sample_record(3, b"metric.c:3|c", 1),
219 ],
220 );
221
222 let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
223
224 for expected_ts in [1, 2, 3] {
225 let msg = reader.read_next().expect("read should succeed").expect("record");
226 assert_eq!(msg.timestamp, expected_ts);
227 }
228
229 assert!(reader.read_next().expect("read should succeed").is_none());
230 }
231
232 #[test]
233 fn read_next_stops_at_state_separator() {
234 let (path, _dir_guard) = run_capture(1, false, &[sample_record(7, b"x:1|c", 5)]);
235
236 let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
237 assert!(reader.read_next().expect("first record").is_some());
238
239 assert!(reader.read_next().expect("trailer boundary").is_none());
241 assert!(reader.read_next().expect("idempotent EOF").is_none());
242 }
243
244 #[test]
245 fn truncated_record_returns_none() {
246 let (path, _dir_guard) = run_capture(1, false, &[sample_record(1, b"metric.a:1|c", 1)]);
247
248 let bytes = fs::read(&path).expect("capture readable");
249 let truncated_path = path.with_extension("truncated");
250 fs::write(&truncated_path, &bytes[..bytes.len().saturating_sub(8)]).expect("write truncated");
252
253 let mut reader = TrafficCaptureReader::from_path(&truncated_path).expect("reader should open");
254 let _ = reader.read_next();
255 assert!(reader.read_next().expect("clean EOF on truncation").is_none());
257 }
258
259 #[test]
260 fn bad_header_is_rejected() {
261 let tmp = unique_path("reader-bad-header");
262 fs::write(&tmp, b"this is not a capture file").expect("write garbage");
263
264 let err = TrafficCaptureReader::from_path(&tmp).expect_err("bad header should fail");
265 assert!(err.to_string().contains("Datadog capture header"));
266
267 let _ = fs::remove_file(&tmp);
268 }
269
270 #[test]
271 fn read_state_recovers_entity_tags() {
272 let target_dir = unique_dir("reader-state");
273 let workload = Arc::new(MockWorkloadProvider::with_entity(
274 EntityId::Container("container-xyz".into()),
275 &["env:prod", "service:api"],
276 ));
277 let writer = TrafficCaptureWriter::with_workload_provider(1, Some(workload));
278
279 let path = writer
280 .start_capture(
281 CaptureTargetDir::Explicit(target_dir.clone()),
282 Duration::from_millis(250),
283 false,
284 )
285 .expect("capture should start");
286 assert!(writer.enqueue(CaptureRecord {
287 timestamp_ns: 1,
288 payload: b"metric:1|c".to_vec(),
289 pid: Some(99),
290 ancillary: Vec::new(),
291 container_id: Some("container_id://container-xyz".to_string()),
292 }));
293 writer.stop_capture();
294 wait_until_inactive(&writer);
295
296 let reader = TrafficCaptureReader::from_path(&path).expect("reader should open");
297 let state = reader
298 .read_state()
299 .expect("state should decode")
300 .expect("state present");
301 let entity = state
302 .state
303 .get("container_id://container-xyz")
304 .expect("captured entity present");
305 assert_eq!(
306 entity.low_cardinality_tags,
307 vec!["env:prod".to_string(), "service:api".to_string()]
308 );
309 assert_eq!(
310 state.pid_map.get(&99).map(String::as_str),
311 Some("container_id://container-xyz")
312 );
313
314 let _ = fs::remove_dir_all(target_dir);
315 }
316
317 fn run_capture(queue_depth: usize, compressed: bool, records: &[CaptureRecord]) -> (PathBuf, DirGuard) {
318 let target_dir = unique_dir("reader-capture");
319 let writer = TrafficCaptureWriter::new(queue_depth);
320
321 let path = writer
322 .start_capture(
323 CaptureTargetDir::Explicit(target_dir.clone()),
324 Duration::from_millis(500),
325 compressed,
326 )
327 .expect("capture should start");
328
329 for record in records {
330 assert!(writer.enqueue(clone_record(record)));
331 }
332 writer.stop_capture();
333 wait_until_inactive(&writer);
334
335 (path, DirGuard { path: target_dir })
336 }
337
338 fn clone_record(record: &CaptureRecord) -> CaptureRecord {
339 CaptureRecord {
340 timestamp_ns: record.timestamp_ns,
341 payload: record.payload.clone(),
342 pid: record.pid,
343 ancillary: record.ancillary.clone(),
344 container_id: record.container_id.clone(),
345 }
346 }
347
348 fn sample_record(timestamp_ns: i64, payload: &[u8], pid: i32) -> CaptureRecord {
349 CaptureRecord {
350 timestamp_ns,
351 payload: payload.to_vec(),
352 pid: Some(pid),
353 ancillary: Vec::new(),
354 container_id: Some(format!("container_id://container-{}", pid)),
355 }
356 }
357
358 fn shared_tags(tags: &[&str]) -> SharedTagSet {
359 TagSet::from_iter(tags.iter().copied().map(Tag::from)).into_shared()
360 }
361
362 fn wait_until_inactive(writer: &TrafficCaptureWriter) {
363 let deadline = std::time::Instant::now() + Duration::from_secs(2);
364 while writer.is_ongoing() && std::time::Instant::now() < deadline {
365 thread::sleep(Duration::from_millis(10));
366 }
367 assert!(!writer.is_ongoing(), "capture writer did not stop in time");
368 }
369
370 fn unique_dir(label: &str) -> PathBuf {
371 let path = unique_path(label);
372 fs::create_dir_all(&path).expect("test directory should be created");
373 path
374 }
375
376 fn unique_path(label: &str) -> PathBuf {
377 let timestamp = SystemTime::now()
378 .duration_since(UNIX_EPOCH)
379 .expect("clock should be after epoch")
380 .as_nanos();
381 std::env::temp_dir().join(format!("saluki-{}-{}-{}", label, std::process::id(), timestamp))
382 }
383
384 struct DirGuard {
385 path: PathBuf,
386 }
387
388 impl Drop for DirGuard {
389 fn drop(&mut self) {
390 let _ = fs::remove_dir_all(&self.path);
391 }
392 }
393}