saluki_components/sources/dogstatsd/replay/
replay_control.rs1use std::sync::{Arc, Mutex};
7
8use datadog_protos::agent::TaggerState;
9use rand::RngExt as _;
10use saluki_error::{generic_error, GenericError};
11
12use super::{CapturedTaggerHandle, CapturedTaggerStore};
13
14const UNAVAILABLE_REPLAY_CONTROL_ERROR: &str =
15 "DogStatsD replay control is unavailable because the source is not running.";
16
17const REPLAY_ALREADY_IN_PROGRESS_ERROR: &str = "DogStatsD replay already in progress.";
18
19pub const DEFAULT_REPLAY_LOOPS: u32 = 1;
21
22#[derive(Clone, Debug, Eq, PartialEq)]
24pub struct ReplaySession {
25 pub id: String,
27}
28
29#[derive(Clone, Default)]
34pub struct DogStatsDReplayControl {
35 inner: Arc<Mutex<ReplayControlState>>,
36}
37
38#[derive(Default)]
39struct ReplayControlState {
40 captured_tagger: Option<CapturedTaggerHandle>,
41 active_session_id: Option<String>,
42}
43
44impl DogStatsDReplayControl {
45 pub fn new() -> Self {
47 Self::default()
48 }
49
50 pub(crate) fn bind(&self, captured_tagger: CapturedTaggerHandle) {
52 let mut state = self.inner.lock().expect("replay control mutex poisoned");
53 state.captured_tagger = Some(captured_tagger);
54 }
55
56 pub fn start_session(&self, tagger_state: Option<TaggerState>) -> Result<ReplaySession, GenericError> {
63 let mut state = self.inner.lock().expect("replay control mutex poisoned");
64 let captured_tagger = state
65 .captured_tagger
66 .clone()
67 .ok_or_else(|| generic_error!("{}", UNAVAILABLE_REPLAY_CONTROL_ERROR))?;
68
69 if state.active_session_id.is_some() {
70 return Err(generic_error!("{}", REPLAY_ALREADY_IN_PROGRESS_ERROR));
71 }
72
73 let session = ReplaySession {
74 id: new_replay_session_id(),
75 };
76
77 let captured_store = tagger_state.map(CapturedTaggerStore::from_tagger_state);
78 captured_tagger.set_current(captured_store);
79 state.active_session_id = Some(session.id.clone());
80
81 Ok(session)
82 }
83
84 pub fn finish_session(&self, session_id: &str) -> Result<(), GenericError> {
91 let mut state = self.inner.lock().expect("replay control mutex poisoned");
92 let captured_tagger = state
93 .captured_tagger
94 .clone()
95 .ok_or_else(|| generic_error!("{}", UNAVAILABLE_REPLAY_CONTROL_ERROR))?;
96
97 match state.active_session_id.as_deref() {
98 Some(active_session_id) if active_session_id == session_id => {
99 captured_tagger.set_current(None);
100 state.active_session_id = None;
101 Ok(())
102 }
103 Some(active_session_id) => Err(generic_error!(
104 "DogStatsD replay session '{}' does not own active replay session '{}'.",
105 session_id,
106 active_session_id
107 )),
108 None => Ok(()),
109 }
110 }
111}
112
113fn new_replay_session_id() -> String {
114 format!("{:032x}", rand::rng().random::<u128>())
115}
116
117#[cfg(test)]
118mod tests {
119 use std::collections::HashMap;
120
121 use datadog_protos::agent::{Entity as ProtoEntity, TaggerState};
122 use saluki_context::origin::OriginTagCardinality;
123
124 use super::*;
125
126 fn make_state() -> TaggerState {
127 let entity = ProtoEntity {
128 low_cardinality_tags: vec!["env:prod".into()],
129 ..Default::default()
130 };
131
132 let mut entities = HashMap::new();
133 entities.insert("container_id://container-xyz".to_string(), entity);
134
135 let mut pid_map = HashMap::new();
136 pid_map.insert(99, "container_id://container-xyz".to_string());
137
138 TaggerState {
139 state: entities,
140 pid_map,
141 duration: 0,
142 }
143 }
144
145 #[test]
146 fn control_requires_bound_store() {
147 let control = DogStatsDReplayControl::new();
148 let err = control
149 .start_session(Some(make_state()))
150 .expect_err("unbound control should fail");
151 assert_eq!(err.to_string(), UNAVAILABLE_REPLAY_CONTROL_ERROR);
152 }
153
154 #[test]
155 fn session_start_and_finish_update_bound_store() {
156 let captured_tagger = CapturedTaggerHandle::new();
157 let control = DogStatsDReplayControl::new();
158 control.bind(captured_tagger.clone());
159
160 let session = control.start_session(Some(make_state())).expect("session should start");
161 let store = captured_tagger.current().expect("state should be set");
162 assert!(store.lookup(99, OriginTagCardinality::Low).is_some());
163
164 control
165 .finish_session(&session.id)
166 .expect("matching session should finish");
167 assert!(captured_tagger.current().is_none());
168 }
169
170 #[test]
171 fn session_rejects_concurrent_start() {
172 let captured_tagger = CapturedTaggerHandle::new();
173 let control = DogStatsDReplayControl::new();
174 control.bind(captured_tagger);
175
176 let _session = control
177 .start_session(Some(make_state()))
178 .expect("first session should start");
179 let err = control
180 .start_session(Some(make_state()))
181 .expect_err("second session should fail");
182 assert!(err.to_string().contains("replay already in progress"));
183 }
184
185 #[test]
186 fn finish_session_rejects_non_owner() {
187 let captured_tagger = CapturedTaggerHandle::new();
188 let control = DogStatsDReplayControl::new();
189 control.bind(captured_tagger.clone());
190
191 let session = control.start_session(Some(make_state())).expect("session should start");
192 let err = control
193 .finish_session("wrong-session")
194 .expect_err("non-owner should fail");
195 assert!(err.to_string().contains("does not own"));
196
197 assert!(captured_tagger.current().is_some());
198 control
199 .finish_session(&session.id)
200 .expect("matching session should finish");
201 assert!(captured_tagger.current().is_none());
202 }
203}