saluki_components/destinations/dsd_stats/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_api::{
7    extract::{Query, State},
8    routing::{get, Router},
9    APIHandler, StatusCode,
10};
11use saluki_common::time::get_coarse_unix_timestamp;
12use saluki_context::tags::SharedTagSet;
13use saluki_core::{
14    components::{
15        destinations::{Destination, DestinationBuilder, DestinationContext},
16        ComponentContext,
17    },
18    data_model::event::{Event, EventType},
19};
20use saluki_error::GenericError;
21use serde::{Deserialize, Serialize, Serializer};
22use serde_json;
23use stringtheory::MetaString;
24use tokio::sync::{Mutex, OwnedMutexGuard};
25use tokio::time::{Duration, Instant};
26use tokio::{select, sync::mpsc, sync::oneshot};
27
28type StatsRequestReceiver = mpsc::Receiver<(oneshot::Sender<StatsResponse>, u64)>;
29
30#[derive(Debug, Default, Clone, Serialize)]
31pub struct MetricSample {
32    count: u64,
33    last_seen: u64,
34}
35#[derive(Serialize)]
36enum StatsResponse {
37    /// An existing statistics collection request is running.
38    AlreadyRunning {
39        /// Number of seconds to wait before trying again.
40        try_after: u64,
41    },
42
43    Statistics(CollectedStatistics),
44}
45
46#[derive(Serialize)]
47struct CollectedStatistics {
48    /// Start time of the collected metrics, as a Unix timestamp.
49    start_time_unix: u64,
50
51    /// End time of the collected metrics, as a Unix timestamp.
52    end_time_unix: u64,
53
54    /// Collected statistics.
55    stats: FlattenedStats,
56}
57
58#[derive(Serialize)]
59struct FlattenedMetricStat<'a> {
60    #[serde(flatten)]
61    context: &'a ContextNoOrigin,
62
63    #[serde(flatten)]
64    stats: &'a MetricSample,
65}
66
67struct FlattenedStats(HashMap<ContextNoOrigin, MetricSample>);
68
69impl Serialize for FlattenedStats {
70    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
71    where
72        S: Serializer,
73    {
74        serializer.collect_seq(
75            self.0
76                .iter()
77                .map(|(context, stats)| FlattenedMetricStat { context, stats }),
78        )
79    }
80}
81
82/// Configuration for DogStatsD statistics destination and API handler.
83#[derive(Clone)]
84pub struct DogStatsDStatisticsConfiguration {
85    api_handler: DogStatsDAPIHandler,
86    rx: Arc<Mutex<StatsRequestReceiver>>,
87}
88/// State for the DogStatsD API handler.
89#[derive(Clone)]
90pub struct DogStatsDAPIHandlerState {
91    tx: Arc<mpsc::Sender<(oneshot::Sender<StatsResponse>, u64)>>,
92}
93
94/// API handler for dogstatsd stats endpoint.
95#[derive(Clone)]
96pub struct DogStatsDAPIHandler {
97    state: DogStatsDAPIHandlerState,
98}
99
100/// DogStatsD destination that collects metrics and processes statistics.
101pub struct DogStatsDStats {
102    rx: OwnedMutexGuard<StatsRequestReceiver>,
103}
104
105#[async_trait::async_trait]
106impl Destination for DogStatsDStats {
107    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
108        let mut health = context.take_health_handle();
109        let mut collection_active = false;
110        let mut stats_response_tx: Option<tokio::sync::oneshot::Sender<StatsResponse>> = None;
111        let mut current_stats: Option<HashMap<ContextNoOrigin, MetricSample>> = None;
112        let mut stats_collection_start_time = 0;
113        let mut stats_collection_end_time = 0;
114        let collection_done = tokio::time::sleep(std::time::Duration::ZERO);
115        tokio::pin!(collection_done);
116
117        health.mark_ready();
118
119        loop {
120            select! {
121                _ = health.live() => {
122                    continue
123                },
124                Some((response_tx, collection_period_secs)) = self.rx.recv() => {
125                    if collection_active {
126                        // We're already collecting statistics for another stats request
127                        // so inform the caller they need to try again later.
128                        let try_after = stats_collection_end_time - get_coarse_unix_timestamp();
129
130                        // We don't care if we can successfully send back a response or not.
131                        let _ = response_tx.send(StatsResponse::AlreadyRunning { try_after });
132                    } else {
133                        // Start collection.
134                        collection_active = true;
135                        stats_collection_start_time = get_coarse_unix_timestamp();
136                        stats_collection_end_time = stats_collection_start_time + collection_period_secs;
137                        stats_response_tx = Some(response_tx);
138                        current_stats = Some(HashMap::new());
139                        collection_done.as_mut().reset(Instant::now() + Duration::from_secs(collection_period_secs));
140                    }
141                },
142                maybe_events = context.events().next() => match maybe_events {
143                    Some(events) => {
144                        if let Some(stats) = current_stats.as_mut() {
145                            // We're actively collecting, so process the metrics.
146                            for event in events {
147                                if let Event::Metric(metric) = event {
148
149                                    let context = metric.context();
150                                    let new_context = ContextNoOrigin {
151                                        name: context.name().clone(),
152                                        tags: context.tags().clone(),
153                                    };
154
155                                    let timestamp = get_coarse_unix_timestamp();
156                                    let sample = stats.entry(new_context).or_default();
157                                    sample.count += 1;
158                                    sample.last_seen = timestamp;
159
160                            }
161                        }
162                     }},
163                     None => break,
164                },
165                _ = &mut collection_done, if collection_active => {
166                    collection_active = false;
167
168                    // Build the response.
169                    let stats = match current_stats.take() {
170                        Some(stats) => stats,
171                        None => continue,
172                    };
173
174                    let response = StatsResponse::Statistics(CollectedStatistics {
175                        start_time_unix: stats_collection_start_time,
176                        end_time_unix: stats_collection_end_time,
177                        stats: FlattenedStats(stats),
178                    });
179
180                    let response_tx = match stats_response_tx.take() {
181                        Some(tx) => tx,
182                        None => continue,
183                    };
184
185                    // We don't care if we can successfully send back a response or not.
186                    let _ = response_tx.send(response);
187                }
188
189            }
190        }
191        Ok(())
192    }
193}
194
195#[derive(Eq, Hash, PartialEq, Serialize)]
196struct ContextNoOrigin {
197    name: MetaString,
198    tags: SharedTagSet,
199}
200#[derive(Deserialize)]
201struct StatsQueryParams {
202    collection_duration_secs: u64,
203}
204
205impl DogStatsDAPIHandler {
206    async fn stats_handler(
207        State(state): State<DogStatsDAPIHandlerState>, Query(query): Query<StatsQueryParams>,
208    ) -> (StatusCode, String) {
209        const MAXIMUM_COLLECTION_DURATION_SECS: u64 = 600;
210        if query.collection_duration_secs > MAXIMUM_COLLECTION_DURATION_SECS {
211            return (
212                StatusCode::BAD_REQUEST,
213                format!(
214                    "Collection duration cannot be greater than {} seconds.",
215                    MAXIMUM_COLLECTION_DURATION_SECS
216                ),
217            );
218        }
219
220        let (oneshot_tx, oneshot_rx) = oneshot::channel();
221
222        state
223            .tx
224            .send((oneshot_tx, query.collection_duration_secs))
225            .await
226            .unwrap(); // TODO: use config to set collection period
227
228        match oneshot_rx.await {
229            Ok(stats) => match stats {
230                StatsResponse::Statistics(collected_stats) => match serde_json::to_string(&collected_stats) {
231                    Ok(json) => (StatusCode::OK, json),
232                    Err(e) => (
233                        StatusCode::INTERNAL_SERVER_ERROR,
234                        format!("Failed to serialize stats: {}", e),
235                    ),
236                },
237                StatsResponse::AlreadyRunning { try_after } => (
238                    StatusCode::TOO_MANY_REQUESTS,
239                    format!(
240                        "Statistics collection already active. Please try again in {} seconds.",
241                        try_after
242                    ),
243                ),
244            },
245            Err(_) => (
246                StatusCode::INTERNAL_SERVER_ERROR,
247                "Failed to collect statistics.".to_string(),
248            ),
249        }
250    }
251}
252
253impl APIHandler for DogStatsDAPIHandler {
254    type State = DogStatsDAPIHandlerState;
255
256    fn generate_initial_state(&self) -> Self::State {
257        self.state.clone()
258    }
259
260    fn generate_routes(&self) -> Router<Self::State> {
261        Router::new().route("/dogstatsd/stats", get(Self::stats_handler))
262    }
263}
264
265impl DogStatsDStatisticsConfiguration {
266    /// Creates a new 'DogStatsDStatisticsConfiguration' from the given configuration.
267    pub fn from_configuration() -> Result<Self, GenericError> {
268        let (tx, rx) = mpsc::channel(4);
269        let state = DogStatsDAPIHandlerState { tx: Arc::new(tx) };
270        let handler = DogStatsDAPIHandler { state };
271
272        Ok(Self {
273            api_handler: handler,
274            rx: Arc::new(Mutex::new(rx)),
275        })
276    }
277
278    /// Returns an API handler for DogStatsD API.
279    pub fn api_handler(&self) -> DogStatsDAPIHandler {
280        self.api_handler.clone()
281    }
282}
283
284#[async_trait]
285impl DestinationBuilder for DogStatsDStatisticsConfiguration {
286    fn input_event_type(&self) -> EventType {
287        EventType::Metric
288    }
289
290    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
291        let rx = self.rx.clone().try_lock_owned()?;
292        Ok(Box::new(DogStatsDStats { rx }))
293    }
294}
295
296impl MemoryBounds for DogStatsDStatisticsConfiguration {
297    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
298        builder
299            .minimum()
300            .with_single_value::<DogStatsDStats>("component struct");
301    }
302}