saluki_components/destinations/dsd_stats/
mod.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_api::{
7 extract::{Query, State},
8 routing::{get, Router},
9 APIHandler, StatusCode,
10};
11use saluki_common::time::get_coarse_unix_timestamp;
12use saluki_context::tags::SharedTagSet;
13use saluki_core::{
14 components::{
15 destinations::{Destination, DestinationBuilder, DestinationContext},
16 ComponentContext,
17 },
18 data_model::event::{Event, EventType},
19};
20use saluki_error::GenericError;
21use serde::{Deserialize, Serialize, Serializer};
22use serde_json;
23use stringtheory::MetaString;
24use tokio::sync::{Mutex, OwnedMutexGuard};
25use tokio::time::{Duration, Instant};
26use tokio::{select, sync::mpsc, sync::oneshot};
27
28type StatsRequestReceiver = mpsc::Receiver<(oneshot::Sender<StatsResponse>, u64)>;
29
30#[derive(Debug, Default, Clone, Serialize)]
31pub struct MetricSample {
32 count: u64,
33 last_seen: u64,
34}
35#[derive(Serialize)]
36enum StatsResponse {
37 AlreadyRunning {
39 try_after: u64,
41 },
42
43 Statistics(CollectedStatistics),
44}
45
46#[derive(Serialize)]
47struct CollectedStatistics {
48 start_time_unix: u64,
50
51 end_time_unix: u64,
53
54 stats: FlattenedStats,
56}
57
58#[derive(Serialize)]
59struct FlattenedMetricStat<'a> {
60 #[serde(flatten)]
61 context: &'a ContextNoOrigin,
62
63 #[serde(flatten)]
64 stats: &'a MetricSample,
65}
66
67struct FlattenedStats(HashMap<ContextNoOrigin, MetricSample>);
68
69impl Serialize for FlattenedStats {
70 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71 where
72 S: Serializer,
73 {
74 serializer.collect_seq(
75 self.0
76 .iter()
77 .map(|(context, stats)| FlattenedMetricStat { context, stats }),
78 )
79 }
80}
81
82#[derive(Clone)]
84pub struct DogStatsDStatisticsConfiguration {
85 api_handler: DogStatsDAPIHandler,
86 rx: Arc<Mutex<StatsRequestReceiver>>,
87}
88#[derive(Clone)]
90pub struct DogStatsDAPIHandlerState {
91 tx: Arc<mpsc::Sender<(oneshot::Sender<StatsResponse>, u64)>>,
92}
93
94#[derive(Clone)]
96pub struct DogStatsDAPIHandler {
97 state: DogStatsDAPIHandlerState,
98}
99
100pub struct DogStatsDStats {
102 rx: OwnedMutexGuard<StatsRequestReceiver>,
103}
104
105#[async_trait::async_trait]
106impl Destination for DogStatsDStats {
107 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
108 let mut health = context.take_health_handle();
109 let mut collection_active = false;
110 let mut stats_response_tx: Option<tokio::sync::oneshot::Sender<StatsResponse>> = None;
111 let mut current_stats: Option<HashMap<ContextNoOrigin, MetricSample>> = None;
112 let mut stats_collection_start_time = 0;
113 let mut stats_collection_end_time = 0;
114 let collection_done = tokio::time::sleep(std::time::Duration::ZERO);
115 tokio::pin!(collection_done);
116
117 health.mark_ready();
118
119 loop {
120 select! {
121 _ = health.live() => {
122 continue
123 },
124 Some((response_tx, collection_period_secs)) = self.rx.recv() => {
125 if collection_active {
126 let try_after = stats_collection_end_time - get_coarse_unix_timestamp();
129
130 let _ = response_tx.send(StatsResponse::AlreadyRunning { try_after });
132 } else {
133 collection_active = true;
135 stats_collection_start_time = get_coarse_unix_timestamp();
136 stats_collection_end_time = stats_collection_start_time + collection_period_secs;
137 stats_response_tx = Some(response_tx);
138 current_stats = Some(HashMap::new());
139 collection_done.as_mut().reset(Instant::now() + Duration::from_secs(collection_period_secs));
140 }
141 },
142 maybe_events = context.events().next() => match maybe_events {
143 Some(events) => {
144 if let Some(stats) = current_stats.as_mut() {
145 for event in events {
147 if let Event::Metric(metric) = event {
148
149 let context = metric.context();
150 let new_context = ContextNoOrigin {
151 name: context.name().clone(),
152 tags: context.tags().clone(),
153 };
154
155 let timestamp = get_coarse_unix_timestamp();
156 let sample = stats.entry(new_context).or_default();
157 sample.count += 1;
158 sample.last_seen = timestamp;
159
160 }
161 }
162 }},
163 None => break,
164 },
165 _ = &mut collection_done, if collection_active => {
166 collection_active = false;
167
168 let stats = match current_stats.take() {
170 Some(stats) => stats,
171 None => continue,
172 };
173
174 let response = StatsResponse::Statistics(CollectedStatistics {
175 start_time_unix: stats_collection_start_time,
176 end_time_unix: stats_collection_end_time,
177 stats: FlattenedStats(stats),
178 });
179
180 let response_tx = match stats_response_tx.take() {
181 Some(tx) => tx,
182 None => continue,
183 };
184
185 let _ = response_tx.send(response);
187 }
188
189 }
190 }
191 Ok(())
192 }
193}
194
195#[derive(Eq, Hash, PartialEq, Serialize)]
196struct ContextNoOrigin {
197 name: MetaString,
198 tags: SharedTagSet,
199}
200#[derive(Deserialize)]
201struct StatsQueryParams {
202 collection_duration_secs: u64,
203}
204
205impl DogStatsDAPIHandler {
206 async fn stats_handler(
207 State(state): State<DogStatsDAPIHandlerState>, Query(query): Query<StatsQueryParams>,
208 ) -> (StatusCode, String) {
209 const MAXIMUM_COLLECTION_DURATION_SECS: u64 = 600;
210 if query.collection_duration_secs > MAXIMUM_COLLECTION_DURATION_SECS {
211 return (
212 StatusCode::BAD_REQUEST,
213 format!(
214 "Collection duration cannot be greater than {} seconds.",
215 MAXIMUM_COLLECTION_DURATION_SECS
216 ),
217 );
218 }
219
220 let (oneshot_tx, oneshot_rx) = oneshot::channel();
221
222 state
223 .tx
224 .send((oneshot_tx, query.collection_duration_secs))
225 .await
226 .unwrap(); match oneshot_rx.await {
229 Ok(stats) => match stats {
230 StatsResponse::Statistics(collected_stats) => match serde_json::to_string(&collected_stats) {
231 Ok(json) => (StatusCode::OK, json),
232 Err(e) => (
233 StatusCode::INTERNAL_SERVER_ERROR,
234 format!("Failed to serialize stats: {}", e),
235 ),
236 },
237 StatsResponse::AlreadyRunning { try_after } => (
238 StatusCode::TOO_MANY_REQUESTS,
239 format!(
240 "Statistics collection already active. Please try again in {} seconds.",
241 try_after
242 ),
243 ),
244 },
245 Err(_) => (
246 StatusCode::INTERNAL_SERVER_ERROR,
247 "Failed to collect statistics.".to_string(),
248 ),
249 }
250 }
251}
252
253impl APIHandler for DogStatsDAPIHandler {
254 type State = DogStatsDAPIHandlerState;
255
256 fn generate_initial_state(&self) -> Self::State {
257 self.state.clone()
258 }
259
260 fn generate_routes(&self) -> Router<Self::State> {
261 Router::new().route("/dogstatsd/stats", get(Self::stats_handler))
262 }
263}
264
265impl DogStatsDStatisticsConfiguration {
266 pub fn from_configuration() -> Result<Self, GenericError> {
268 let (tx, rx) = mpsc::channel(4);
269 let state = DogStatsDAPIHandlerState { tx: Arc::new(tx) };
270 let handler = DogStatsDAPIHandler { state };
271
272 Ok(Self {
273 api_handler: handler,
274 rx: Arc::new(Mutex::new(rx)),
275 })
276 }
277
278 pub fn api_handler(&self) -> DogStatsDAPIHandler {
280 self.api_handler.clone()
281 }
282}
283
284#[async_trait]
285impl DestinationBuilder for DogStatsDStatisticsConfiguration {
286 fn input_event_type(&self) -> EventType {
287 EventType::Metric
288 }
289
290 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
291 let rx = self.rx.clone().try_lock_owned()?;
292 Ok(Box::new(DogStatsDStats { rx }))
293 }
294}
295
296impl MemoryBounds for DogStatsDStatisticsConfiguration {
297 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
298 builder
299 .minimum()
300 .with_single_value::<DogStatsDStats>("component struct");
301 }
302}