saluki_components/destinations/dsd_stats/
mod.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_api::{
7 extract::{Query, State},
8 routing::{get, Router},
9 APIHandler, StatusCode,
10};
11use saluki_common::time::get_coarse_unix_timestamp;
12use saluki_context::tags::TagSet;
13use saluki_core::{
14 components::{
15 destinations::{Destination, DestinationBuilder, DestinationContext},
16 ComponentContext,
17 },
18 data_model::event::{Event, EventType},
19};
20use saluki_error::GenericError;
21use serde::{Deserialize, Serialize, Serializer};
22use serde_json;
23use stringtheory::MetaString;
24use tokio::time::{sleep, Duration, Instant};
25use tokio::{
26 pin,
27 sync::{Mutex, OwnedMutexGuard},
28};
29use tokio::{select, sync::mpsc, sync::oneshot};
30
31type StatsRequestReceiver = mpsc::Receiver<(oneshot::Sender<StatsResponse>, u64)>;
32
33#[derive(Debug, Default, Clone, Serialize)]
34pub struct MetricSample {
35 count: u64,
36 last_seen: u64,
37}
38#[derive(Serialize)]
39enum StatsResponse {
40 AlreadyRunning {
42 try_after: u64,
44 },
45
46 Statistics(CollectedStatistics),
47}
48
49#[derive(Serialize)]
50struct CollectedStatistics {
51 start_time_unix: u64,
53
54 end_time_unix: u64,
56
57 stats: FlattenedStats,
59}
60
61#[derive(Serialize)]
62struct FlattenedMetricStat<'a> {
63 #[serde(flatten)]
64 context: &'a ContextNoOrigin,
65
66 #[serde(flatten)]
67 stats: &'a MetricSample,
68}
69
70struct FlattenedStats(HashMap<ContextNoOrigin, MetricSample>);
71
72impl Serialize for FlattenedStats {
73 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
74 where
75 S: Serializer,
76 {
77 serializer.collect_seq(
78 self.0
79 .iter()
80 .map(|(context, stats)| FlattenedMetricStat { context, stats }),
81 )
82 }
83}
84
85#[derive(Clone)]
87pub struct DogStatsDStatisticsConfiguration {
88 api_handler: DogStatsDStatsAPIHandler,
89 rx: Arc<Mutex<StatsRequestReceiver>>,
90}
91#[derive(Clone)]
93pub struct DogStatsDStatsAPIHandlerState {
94 tx: Arc<mpsc::Sender<(oneshot::Sender<StatsResponse>, u64)>>,
95}
96
97#[derive(Clone)]
99pub struct DogStatsDStatsAPIHandler {
100 state: DogStatsDStatsAPIHandlerState,
101}
102
103pub struct DogStatsDStats {
105 rx: OwnedMutexGuard<StatsRequestReceiver>,
106}
107
108#[async_trait::async_trait]
109impl Destination for DogStatsDStats {
110 async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
111 let mut health = context.take_health_handle();
112 let mut collection_active = false;
113 let mut stats_response_tx: Option<tokio::sync::oneshot::Sender<StatsResponse>> = None;
114 let mut current_stats: Option<HashMap<ContextNoOrigin, MetricSample>> = None;
115 let mut stats_collection_start_time = 0;
116 let mut stats_collection_end_time = 0;
117 let collection_done = sleep(std::time::Duration::ZERO);
118 pin!(collection_done);
119
120 health.mark_ready();
121
122 loop {
123 select! {
124 _ = health.live() => {
125 continue
126 },
127 Some((response_tx, collection_period_secs)) = self.rx.recv() => {
128 if collection_active {
129 let try_after = stats_collection_end_time - get_coarse_unix_timestamp();
132
133 let _ = response_tx.send(StatsResponse::AlreadyRunning { try_after });
135 } else {
136 collection_active = true;
138 stats_collection_start_time = get_coarse_unix_timestamp();
139 stats_collection_end_time = stats_collection_start_time + collection_period_secs;
140 stats_response_tx = Some(response_tx);
141 current_stats = Some(HashMap::new());
142 collection_done.as_mut().reset(Instant::now() + Duration::from_secs(collection_period_secs));
143 }
144 },
145 maybe_events = context.events().next() => match maybe_events {
146 Some(events) => {
147 if let Some(stats) = current_stats.as_mut() {
148 for event in events {
150 if let Event::Metric(metric) = event {
151
152 let context = metric.context();
153 let new_context = ContextNoOrigin {
154 name: context.name().clone(),
155 tags: context.tags().clone(),
156 };
157
158 let timestamp = get_coarse_unix_timestamp();
159 let sample = stats.entry(new_context).or_default();
160 sample.count += 1;
161 sample.last_seen = timestamp;
162
163 }
164 }
165 }},
166 None => break,
167 },
168 _ = &mut collection_done, if collection_active => {
169 collection_active = false;
170
171 let stats = match current_stats.take() {
173 Some(stats) => stats,
174 None => continue,
175 };
176
177 let response = StatsResponse::Statistics(CollectedStatistics {
178 start_time_unix: stats_collection_start_time,
179 end_time_unix: stats_collection_end_time,
180 stats: FlattenedStats(stats),
181 });
182
183 let response_tx = match stats_response_tx.take() {
184 Some(tx) => tx,
185 None => continue,
186 };
187
188 let _ = response_tx.send(response);
190 }
191
192 }
193 }
194 Ok(())
195 }
196}
197
198#[derive(Eq, Hash, PartialEq, Serialize)]
199struct ContextNoOrigin {
200 name: MetaString,
201 tags: TagSet,
202}
203#[derive(Deserialize)]
204struct StatsQueryParams {
205 collection_duration_secs: u64,
206}
207
208impl DogStatsDStatsAPIHandler {
209 async fn stats_handler(
210 State(state): State<DogStatsDStatsAPIHandlerState>, Query(query): Query<StatsQueryParams>,
211 ) -> (StatusCode, String) {
212 const MAXIMUM_COLLECTION_DURATION_SECS: u64 = 600;
213 if query.collection_duration_secs > MAXIMUM_COLLECTION_DURATION_SECS {
214 return (
215 StatusCode::BAD_REQUEST,
216 format!(
217 "Collection duration cannot be greater than {} seconds.",
218 MAXIMUM_COLLECTION_DURATION_SECS
219 ),
220 );
221 }
222
223 let (oneshot_tx, oneshot_rx) = oneshot::channel();
224
225 state
226 .tx
227 .send((oneshot_tx, query.collection_duration_secs))
228 .await
229 .unwrap(); match oneshot_rx.await {
232 Ok(stats) => match stats {
233 StatsResponse::Statistics(collected_stats) => match serde_json::to_string(&collected_stats) {
234 Ok(json) => (StatusCode::OK, json),
235 Err(e) => (
236 StatusCode::INTERNAL_SERVER_ERROR,
237 format!("Failed to serialize stats: {}", e),
238 ),
239 },
240 StatsResponse::AlreadyRunning { try_after } => (
241 StatusCode::TOO_MANY_REQUESTS,
242 format!(
243 "Statistics collection already active. Please try again in {} seconds.",
244 try_after
245 ),
246 ),
247 },
248 Err(_) => (
249 StatusCode::INTERNAL_SERVER_ERROR,
250 "Failed to collect statistics.".to_string(),
251 ),
252 }
253 }
254}
255
256impl APIHandler for DogStatsDStatsAPIHandler {
257 type State = DogStatsDStatsAPIHandlerState;
258
259 fn generate_initial_state(&self) -> Self::State {
260 self.state.clone()
261 }
262
263 fn generate_routes(&self) -> Router<Self::State> {
264 Router::new().route("/dogstatsd/stats", get(Self::stats_handler))
265 }
266}
267
268impl DogStatsDStatisticsConfiguration {
269 pub fn new() -> Self {
271 let (tx, rx) = mpsc::channel(4);
272 let state = DogStatsDStatsAPIHandlerState { tx: Arc::new(tx) };
273 let handler = DogStatsDStatsAPIHandler { state };
274
275 Self {
276 api_handler: handler,
277 rx: Arc::new(Mutex::new(rx)),
278 }
279 }
280
281 pub fn api_handler(&self) -> DogStatsDStatsAPIHandler {
283 self.api_handler.clone()
284 }
285}
286
287#[async_trait]
288impl DestinationBuilder for DogStatsDStatisticsConfiguration {
289 fn input_event_type(&self) -> EventType {
290 EventType::Metric
291 }
292
293 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
294 let rx = self.rx.clone().try_lock_owned()?;
295 Ok(Box::new(DogStatsDStats { rx }))
296 }
297}
298
299impl MemoryBounds for DogStatsDStatisticsConfiguration {
300 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
301 builder
302 .minimum()
303 .with_single_value::<DogStatsDStats>("component struct");
304 }
305}