Skip to main content

saluki_components/sources/dogstatsd/replay/
replay_api.rs

1//! HTTP API handler for DogStatsD replay sessions.
2//!
3//! A replay client starts a session before sending replay packets and finishes the same session when replay ends. The
4//! ADP process keeps the captured tagger snapshot only for the active session.
5
6use axum::body::Bytes;
7use datadog_protos::agent::TaggerState;
8use prost::Message as _;
9use saluki_api::{
10    extract::{Path, State},
11    routing::{delete, post, Router},
12    APIHandler, Json, StatusCode,
13};
14use serde::Serialize;
15
16use super::DogStatsDReplayControl;
17
18/// API handler for the DogStatsD replay session control surface.
19#[derive(Clone)]
20pub struct DogStatsDReplayAPIHandler {
21    replay_control: DogStatsDReplayControl,
22}
23
24impl DogStatsDReplayAPIHandler {
25    /// Creates a new handler bound to the given replay control.
26    pub fn new(replay_control: DogStatsDReplayControl) -> Self {
27        Self { replay_control }
28    }
29
30    async fn start_session_handler(
31        State(replay_control): State<DogStatsDReplayControl>, body: Bytes,
32    ) -> Result<Json<ReplaySessionResponseBody>, (StatusCode, String)> {
33        let state = if body.is_empty() {
34            None
35        } else {
36            Some(
37                TaggerState::decode(body)
38                    .map_err(|e| (StatusCode::BAD_REQUEST, format!("Invalid tagger state: {}", e)))?,
39            )
40        };
41
42        let session = replay_control.start_session(state).map_err(map_replay_control_error)?;
43
44        Ok(Json(ReplaySessionResponseBody { session_id: session.id }))
45    }
46
47    async fn finish_session_handler(
48        State(replay_control): State<DogStatsDReplayControl>, Path(session_id): Path<String>,
49    ) -> Result<StatusCode, (StatusCode, String)> {
50        replay_control
51            .finish_session(&session_id)
52            .map_err(map_replay_control_error)?;
53
54        Ok(StatusCode::NO_CONTENT)
55    }
56}
57
58/// Response body for `POST /dogstatsd/replay/session`.
59#[derive(Debug, Serialize)]
60pub struct ReplaySessionResponseBody {
61    /// Opaque session identifier the replay client must use when finishing the replay.
62    pub session_id: String,
63}
64
65fn map_replay_control_error(err: saluki_error::GenericError) -> (StatusCode, String) {
66    let message = err.to_string();
67    let status = if message.contains("replay already in progress") || message.contains("does not own") {
68        StatusCode::CONFLICT
69    } else {
70        StatusCode::PRECONDITION_FAILED
71    };
72    (status, message)
73}
74
75impl APIHandler for DogStatsDReplayAPIHandler {
76    type State = DogStatsDReplayControl;
77
78    fn generate_initial_state(&self) -> Self::State {
79        self.replay_control.clone()
80    }
81
82    fn generate_routes(&self) -> Router<Self::State> {
83        Router::new()
84            .route("/dogstatsd/replay/session", post(Self::start_session_handler))
85            .route(
86                "/dogstatsd/replay/session/{session_id}",
87                delete(Self::finish_session_handler),
88            )
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use std::collections::HashMap;
95
96    use datadog_protos::agent::{Entity as ProtoEntity, TaggerState};
97    use prost::Message as _;
98    use saluki_context::origin::OriginTagCardinality;
99
100    use super::*;
101    use crate::sources::dogstatsd::replay::CapturedTaggerHandle;
102
103    fn make_state() -> TaggerState {
104        let entity = ProtoEntity {
105            low_cardinality_tags: vec!["env:prod".into()],
106            ..Default::default()
107        };
108
109        let mut entities = HashMap::new();
110        entities.insert("container_id://container-xyz".to_string(), entity);
111
112        let mut pid_map = HashMap::new();
113        pid_map.insert(99, "container_id://container-xyz".to_string());
114
115        TaggerState {
116            state: entities,
117            pid_map,
118            duration: 0,
119        }
120    }
121
122    #[tokio::test]
123    async fn start_session_rejects_invalid_protobuf() {
124        let control = DogStatsDReplayControl::new();
125        let err = DogStatsDReplayAPIHandler::start_session_handler(State(control), Bytes::from_static(b"bad"))
126            .await
127            .expect_err("invalid protobuf should fail");
128
129        assert_eq!(err.0, StatusCode::BAD_REQUEST);
130    }
131
132    #[tokio::test]
133    async fn start_and_finish_session_update_control() {
134        let captured_tagger = CapturedTaggerHandle::new();
135        let control = DogStatsDReplayControl::new();
136        control.bind(captured_tagger.clone());
137
138        let body = Bytes::from(make_state().encode_to_vec());
139        let Json(session) = DogStatsDReplayAPIHandler::start_session_handler(State(control.clone()), body)
140            .await
141            .expect("session should start");
142        assert!(!session.session_id.is_empty());
143
144        let store = captured_tagger.current().expect("state should be set");
145        assert!(store.lookup(99, OriginTagCardinality::Low).is_some());
146
147        let finish_status = DogStatsDReplayAPIHandler::finish_session_handler(State(control), Path(session.session_id))
148            .await
149            .expect("finish should succeed");
150        assert_eq!(finish_status, StatusCode::NO_CONTENT);
151        assert!(captured_tagger.current().is_none());
152    }
153
154    #[tokio::test]
155    async fn start_session_rejects_concurrent_session() {
156        let captured_tagger = CapturedTaggerHandle::new();
157        let control = DogStatsDReplayControl::new();
158        control.bind(captured_tagger);
159
160        let body = Bytes::from(make_state().encode_to_vec());
161        let _ = DogStatsDReplayAPIHandler::start_session_handler(State(control.clone()), body)
162            .await
163            .expect("first session should start");
164
165        let err =
166            DogStatsDReplayAPIHandler::start_session_handler(State(control), Bytes::from(make_state().encode_to_vec()))
167                .await
168                .expect_err("second session should fail");
169        assert_eq!(err.0, StatusCode::CONFLICT);
170    }
171
172    #[tokio::test]
173    async fn finish_session_rejects_non_owner() {
174        let captured_tagger = CapturedTaggerHandle::new();
175        let control = DogStatsDReplayControl::new();
176        control.bind(captured_tagger.clone());
177
178        let body = Bytes::from(make_state().encode_to_vec());
179        let Json(session) = DogStatsDReplayAPIHandler::start_session_handler(State(control.clone()), body)
180            .await
181            .expect("session should start");
182
183        let err = DogStatsDReplayAPIHandler::finish_session_handler(State(control.clone()), Path("wrong".to_string()))
184            .await
185            .expect_err("non-owner should fail");
186        assert_eq!(err.0, StatusCode::CONFLICT);
187        assert!(captured_tagger.current().is_some());
188
189        DogStatsDReplayAPIHandler::finish_session_handler(State(control), Path(session.session_id))
190            .await
191            .expect("owner should finish");
192        assert!(captured_tagger.current().is_none());
193    }
194}