Skip to main content

saluki_components/sources/dogstatsd/replay/
replay_control.rs

1//! Replay session control surface for the DogStatsD source.
2//!
3//! The ADP process owns only the active replay session and its captured tagger snapshot. A separate replay client is
4//! responsible for reading capture files and sending replay packets into the DogStatsD UDS listener.
5
6use std::sync::{Arc, Mutex};
7
8use datadog_protos::agent::TaggerState;
9use rand::RngExt as _;
10use saluki_error::{generic_error, GenericError};
11
12use super::{CapturedTaggerHandle, CapturedTaggerStore};
13
14const UNAVAILABLE_REPLAY_CONTROL_ERROR: &str =
15    "DogStatsD replay control is unavailable because the source is not running.";
16
17const REPLAY_ALREADY_IN_PROGRESS_ERROR: &str = "DogStatsD replay already in progress.";
18
19/// Default number of times to replay a capture file when the CLI omits an explicit count.
20pub const DEFAULT_REPLAY_LOOPS: u32 = 1;
21
22/// Active DogStatsD replay session.
23#[derive(Clone, Debug, Eq, PartialEq)]
24pub struct ReplaySession {
25    /// Opaque identifier the replay client must use to finish the session.
26    pub id: String,
27}
28
29/// Shared control handle for DogStatsD replay sessions.
30///
31/// Created before the source is built and bound to the live captured tagger handle during source construction. The
32/// HTTP API handler holds a clone of this handle so a replay client can acquire and release the single active session.
33#[derive(Clone, Default)]
34pub struct DogStatsDReplayControl {
35    inner: Arc<Mutex<ReplayControlState>>,
36}
37
38#[derive(Default)]
39struct ReplayControlState {
40    captured_tagger: Option<CapturedTaggerHandle>,
41    active_session_id: Option<String>,
42}
43
44impl DogStatsDReplayControl {
45    /// Creates a new, unbound replay control handle.
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    /// Binds the control handle to the captured tagger store used by the running DogStatsD source.
51    pub(crate) fn bind(&self, captured_tagger: CapturedTaggerHandle) {
52        let mut state = self.inner.lock().expect("replay control mutex poisoned");
53        state.captured_tagger = Some(captured_tagger);
54    }
55
56    /// Starts a replay session and sets the captured tagger state for replay-origin resolution.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if the DogStatsD source hasn't been built and bound to this control handle yet, or if another
61    /// replay session is already active.
62    pub fn start_session(&self, tagger_state: Option<TaggerState>) -> Result<ReplaySession, GenericError> {
63        let mut state = self.inner.lock().expect("replay control mutex poisoned");
64        let captured_tagger = state
65            .captured_tagger
66            .clone()
67            .ok_or_else(|| generic_error!("{}", UNAVAILABLE_REPLAY_CONTROL_ERROR))?;
68
69        if state.active_session_id.is_some() {
70            return Err(generic_error!("{}", REPLAY_ALREADY_IN_PROGRESS_ERROR));
71        }
72
73        let session = ReplaySession {
74            id: new_replay_session_id(),
75        };
76
77        let captured_store = tagger_state.map(CapturedTaggerStore::from_tagger_state);
78        captured_tagger.set_current(captured_store);
79        state.active_session_id = Some(session.id.clone());
80
81        Ok(session)
82    }
83
84    /// Finishes a replay session and removes the captured tagger state it owns.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the DogStatsD source hasn't been built and bound to this control handle yet, or if the given
89    /// session does not own the active replay.
90    pub fn finish_session(&self, session_id: &str) -> Result<(), GenericError> {
91        let mut state = self.inner.lock().expect("replay control mutex poisoned");
92        let captured_tagger = state
93            .captured_tagger
94            .clone()
95            .ok_or_else(|| generic_error!("{}", UNAVAILABLE_REPLAY_CONTROL_ERROR))?;
96
97        match state.active_session_id.as_deref() {
98            Some(active_session_id) if active_session_id == session_id => {
99                captured_tagger.set_current(None);
100                state.active_session_id = None;
101                Ok(())
102            }
103            Some(active_session_id) => Err(generic_error!(
104                "DogStatsD replay session '{}' does not own active replay session '{}'.",
105                session_id,
106                active_session_id
107            )),
108            None => Ok(()),
109        }
110    }
111}
112
113fn new_replay_session_id() -> String {
114    format!("{:032x}", rand::rng().random::<u128>())
115}
116
117#[cfg(test)]
118mod tests {
119    use std::collections::HashMap;
120
121    use datadog_protos::agent::{Entity as ProtoEntity, TaggerState};
122    use saluki_context::origin::OriginTagCardinality;
123
124    use super::*;
125
126    fn make_state() -> TaggerState {
127        let entity = ProtoEntity {
128            low_cardinality_tags: vec!["env:prod".into()],
129            ..Default::default()
130        };
131
132        let mut entities = HashMap::new();
133        entities.insert("container_id://container-xyz".to_string(), entity);
134
135        let mut pid_map = HashMap::new();
136        pid_map.insert(99, "container_id://container-xyz".to_string());
137
138        TaggerState {
139            state: entities,
140            pid_map,
141            duration: 0,
142        }
143    }
144
145    #[test]
146    fn control_requires_bound_store() {
147        let control = DogStatsDReplayControl::new();
148        let err = control
149            .start_session(Some(make_state()))
150            .expect_err("unbound control should fail");
151        assert_eq!(err.to_string(), UNAVAILABLE_REPLAY_CONTROL_ERROR);
152    }
153
154    #[test]
155    fn session_start_and_finish_update_bound_store() {
156        let captured_tagger = CapturedTaggerHandle::new();
157        let control = DogStatsDReplayControl::new();
158        control.bind(captured_tagger.clone());
159
160        let session = control.start_session(Some(make_state())).expect("session should start");
161        let store = captured_tagger.current().expect("state should be set");
162        assert!(store.lookup(99, OriginTagCardinality::Low).is_some());
163
164        control
165            .finish_session(&session.id)
166            .expect("matching session should finish");
167        assert!(captured_tagger.current().is_none());
168    }
169
170    #[test]
171    fn session_rejects_concurrent_start() {
172        let captured_tagger = CapturedTaggerHandle::new();
173        let control = DogStatsDReplayControl::new();
174        control.bind(captured_tagger);
175
176        let _session = control
177            .start_session(Some(make_state()))
178            .expect("first session should start");
179        let err = control
180            .start_session(Some(make_state()))
181            .expect_err("second session should fail");
182        assert!(err.to_string().contains("replay already in progress"));
183    }
184
185    #[test]
186    fn finish_session_rejects_non_owner() {
187        let captured_tagger = CapturedTaggerHandle::new();
188        let control = DogStatsDReplayControl::new();
189        control.bind(captured_tagger.clone());
190
191        let session = control.start_session(Some(make_state())).expect("session should start");
192        let err = control
193            .finish_session("wrong-session")
194            .expect_err("non-owner should fail");
195        assert!(err.to_string().contains("does not own"));
196
197        assert!(captured_tagger.current().is_some());
198        control
199            .finish_session(&session.id)
200            .expect("matching session should finish");
201        assert!(captured_tagger.current().is_none());
202    }
203}