saluki_components/sources/dogstatsd/replay/
replay_api.rs1use axum::body::Bytes;
7use datadog_protos::agent::TaggerState;
8use prost::Message as _;
9use saluki_api::{
10 extract::{Path, State},
11 routing::{delete, post, Router},
12 APIHandler, Json, StatusCode,
13};
14use serde::Serialize;
15
16use super::DogStatsDReplayControl;
17
18#[derive(Clone)]
20pub struct DogStatsDReplayAPIHandler {
21 replay_control: DogStatsDReplayControl,
22}
23
24impl DogStatsDReplayAPIHandler {
25 pub fn new(replay_control: DogStatsDReplayControl) -> Self {
27 Self { replay_control }
28 }
29
30 async fn start_session_handler(
31 State(replay_control): State<DogStatsDReplayControl>, body: Bytes,
32 ) -> Result<Json<ReplaySessionResponseBody>, (StatusCode, String)> {
33 let state = if body.is_empty() {
34 None
35 } else {
36 Some(
37 TaggerState::decode(body)
38 .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid tagger state: {}", e)))?,
39 )
40 };
41
42 let session = replay_control.start_session(state).map_err(map_replay_control_error)?;
43
44 Ok(Json(ReplaySessionResponseBody { session_id: session.id }))
45 }
46
47 async fn finish_session_handler(
48 State(replay_control): State<DogStatsDReplayControl>, Path(session_id): Path<String>,
49 ) -> Result<StatusCode, (StatusCode, String)> {
50 replay_control
51 .finish_session(&session_id)
52 .map_err(map_replay_control_error)?;
53
54 Ok(StatusCode::NO_CONTENT)
55 }
56}
57
58#[derive(Debug, Serialize)]
60pub struct ReplaySessionResponseBody {
61 pub session_id: String,
63}
64
65fn map_replay_control_error(err: saluki_error::GenericError) -> (StatusCode, String) {
66 let message = err.to_string();
67 let status = if message.contains("replay already in progress") || message.contains("does not own") {
68 StatusCode::CONFLICT
69 } else {
70 StatusCode::PRECONDITION_FAILED
71 };
72 (status, message)
73}
74
75impl APIHandler for DogStatsDReplayAPIHandler {
76 type State = DogStatsDReplayControl;
77
78 fn generate_initial_state(&self) -> Self::State {
79 self.replay_control.clone()
80 }
81
82 fn generate_routes(&self) -> Router<Self::State> {
83 Router::new()
84 .route("/dogstatsd/replay/session", post(Self::start_session_handler))
85 .route(
86 "/dogstatsd/replay/session/{session_id}",
87 delete(Self::finish_session_handler),
88 )
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use std::collections::HashMap;
95
96 use datadog_protos::agent::{Entity as ProtoEntity, TaggerState};
97 use prost::Message as _;
98 use saluki_context::origin::OriginTagCardinality;
99
100 use super::*;
101 use crate::sources::dogstatsd::replay::CapturedTaggerHandle;
102
103 fn make_state() -> TaggerState {
104 let entity = ProtoEntity {
105 low_cardinality_tags: vec!["env:prod".into()],
106 ..Default::default()
107 };
108
109 let mut entities = HashMap::new();
110 entities.insert("container_id://container-xyz".to_string(), entity);
111
112 let mut pid_map = HashMap::new();
113 pid_map.insert(99, "container_id://container-xyz".to_string());
114
115 TaggerState {
116 state: entities,
117 pid_map,
118 duration: 0,
119 }
120 }
121
122 #[tokio::test]
123 async fn start_session_rejects_invalid_protobuf() {
124 let control = DogStatsDReplayControl::new();
125 let err = DogStatsDReplayAPIHandler::start_session_handler(State(control), Bytes::from_static(b"bad"))
126 .await
127 .expect_err("invalid protobuf should fail");
128
129 assert_eq!(err.0, StatusCode::BAD_REQUEST);
130 }
131
132 #[tokio::test]
133 async fn start_and_finish_session_update_control() {
134 let captured_tagger = CapturedTaggerHandle::new();
135 let control = DogStatsDReplayControl::new();
136 control.bind(captured_tagger.clone());
137
138 let body = Bytes::from(make_state().encode_to_vec());
139 let Json(session) = DogStatsDReplayAPIHandler::start_session_handler(State(control.clone()), body)
140 .await
141 .expect("session should start");
142 assert!(!session.session_id.is_empty());
143
144 let store = captured_tagger.current().expect("state should be set");
145 assert!(store.lookup(99, OriginTagCardinality::Low).is_some());
146
147 let finish_status = DogStatsDReplayAPIHandler::finish_session_handler(State(control), Path(session.session_id))
148 .await
149 .expect("finish should succeed");
150 assert_eq!(finish_status, StatusCode::NO_CONTENT);
151 assert!(captured_tagger.current().is_none());
152 }
153
154 #[tokio::test]
155 async fn start_session_rejects_concurrent_session() {
156 let captured_tagger = CapturedTaggerHandle::new();
157 let control = DogStatsDReplayControl::new();
158 control.bind(captured_tagger);
159
160 let body = Bytes::from(make_state().encode_to_vec());
161 let _ = DogStatsDReplayAPIHandler::start_session_handler(State(control.clone()), body)
162 .await
163 .expect("first session should start");
164
165 let err =
166 DogStatsDReplayAPIHandler::start_session_handler(State(control), Bytes::from(make_state().encode_to_vec()))
167 .await
168 .expect_err("second session should fail");
169 assert_eq!(err.0, StatusCode::CONFLICT);
170 }
171
172 #[tokio::test]
173 async fn finish_session_rejects_non_owner() {
174 let captured_tagger = CapturedTaggerHandle::new();
175 let control = DogStatsDReplayControl::new();
176 control.bind(captured_tagger.clone());
177
178 let body = Bytes::from(make_state().encode_to_vec());
179 let Json(session) = DogStatsDReplayAPIHandler::start_session_handler(State(control.clone()), body)
180 .await
181 .expect("session should start");
182
183 let err = DogStatsDReplayAPIHandler::finish_session_handler(State(control.clone()), Path("wrong".to_string()))
184 .await
185 .expect_err("non-owner should fail");
186 assert_eq!(err.0, StatusCode::CONFLICT);
187 assert!(captured_tagger.current().is_some());
188
189 DogStatsDReplayAPIHandler::finish_session_handler(State(control), Path(session.session_id))
190 .await
191 .expect("owner should finish");
192 assert!(captured_tagger.current().is_none());
193 }
194}