Skip to main content

saluki_components/destinations/dsd_stats/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
6use saluki_api::{
7    extract::{Query, State},
8    routing::{get, Router},
9    APIHandler, StatusCode,
10};
11use saluki_common::time::get_coarse_unix_timestamp;
12use saluki_context::tags::TagSet;
13use saluki_core::{
14    components::{
15        destinations::{Destination, DestinationBuilder, DestinationContext},
16        ComponentContext,
17    },
18    data_model::event::{Event, EventType},
19};
20use saluki_error::GenericError;
21use serde::{Deserialize, Serialize, Serializer};
22use serde_json;
23use stringtheory::MetaString;
24use tokio::time::{sleep, Duration, Instant};
25use tokio::{
26    pin,
27    sync::{Mutex, OwnedMutexGuard},
28};
29use tokio::{select, sync::mpsc, sync::oneshot};
30
31type StatsRequestReceiver = mpsc::Receiver<(oneshot::Sender<StatsResponse>, u64)>;
32
33#[derive(Debug, Default, Clone, Serialize)]
34pub struct MetricSample {
35    count: u64,
36    last_seen: u64,
37}
38#[derive(Serialize)]
39enum StatsResponse {
40    /// An existing statistics collection request is running.
41    AlreadyRunning {
42        /// Number of seconds to wait before trying again.
43        try_after: u64,
44    },
45
46    Statistics(CollectedStatistics),
47}
48
49#[derive(Serialize)]
50struct CollectedStatistics {
51    /// Start time of the collected metrics, as a Unix timestamp.
52    start_time_unix: u64,
53
54    /// End time of the collected metrics, as a Unix timestamp.
55    end_time_unix: u64,
56
57    /// Collected statistics.
58    stats: FlattenedStats,
59}
60
61#[derive(Serialize)]
62struct FlattenedMetricStat<'a> {
63    #[serde(flatten)]
64    context: &'a ContextNoOrigin,
65
66    #[serde(flatten)]
67    stats: &'a MetricSample,
68}
69
70struct FlattenedStats(HashMap<ContextNoOrigin, MetricSample>);
71
72impl Serialize for FlattenedStats {
73    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
74    where
75        S: Serializer,
76    {
77        serializer.collect_seq(
78            self.0
79                .iter()
80                .map(|(context, stats)| FlattenedMetricStat { context, stats }),
81        )
82    }
83}
84
85/// Configuration for DogStatsD statistics destination and API handler.
86#[derive(Clone)]
87pub struct DogStatsDStatisticsConfiguration {
88    api_handler: DogStatsDStatsAPIHandler,
89    rx: Arc<Mutex<StatsRequestReceiver>>,
90}
91/// State for the DogStatsD API handler.
92#[derive(Clone)]
93pub struct DogStatsDStatsAPIHandlerState {
94    tx: Arc<mpsc::Sender<(oneshot::Sender<StatsResponse>, u64)>>,
95}
96
97/// API handler for DogStatsD statistics endpoint.
98#[derive(Clone)]
99pub struct DogStatsDStatsAPIHandler {
100    state: DogStatsDStatsAPIHandlerState,
101}
102
103/// DogStatsD destination that collects metrics and processes statistics.
104pub struct DogStatsDStats {
105    rx: OwnedMutexGuard<StatsRequestReceiver>,
106}
107
108#[async_trait::async_trait]
109impl Destination for DogStatsDStats {
110    async fn run(mut self: Box<Self>, mut context: DestinationContext) -> Result<(), GenericError> {
111        let mut health = context.take_health_handle();
112        let mut collection_active = false;
113        let mut stats_response_tx: Option<tokio::sync::oneshot::Sender<StatsResponse>> = None;
114        let mut current_stats: Option<HashMap<ContextNoOrigin, MetricSample>> = None;
115        let mut stats_collection_start_time = 0;
116        let mut stats_collection_end_time = 0;
117        let collection_done = sleep(std::time::Duration::ZERO);
118        pin!(collection_done);
119
120        health.mark_ready();
121
122        loop {
123            select! {
124                _ = health.live() => {
125                    continue
126                },
127                Some((response_tx, collection_period_secs)) = self.rx.recv() => {
128                    if collection_active {
129                        // We're already collecting statistics for another stats request
130                        // so inform the caller they need to try again later.
131                        let try_after = stats_collection_end_time - get_coarse_unix_timestamp();
132
133                        // We don't care if we can successfully send back a response or not.
134                        let _ = response_tx.send(StatsResponse::AlreadyRunning { try_after });
135                    } else {
136                        // Start collection.
137                        collection_active = true;
138                        stats_collection_start_time = get_coarse_unix_timestamp();
139                        stats_collection_end_time = stats_collection_start_time + collection_period_secs;
140                        stats_response_tx = Some(response_tx);
141                        current_stats = Some(HashMap::new());
142                        collection_done.as_mut().reset(Instant::now() + Duration::from_secs(collection_period_secs));
143                    }
144                },
145                maybe_events = context.events().next() => match maybe_events {
146                    Some(events) => {
147                        if let Some(stats) = current_stats.as_mut() {
148                            // We're actively collecting, so process the metrics.
149                            for event in events {
150                                if let Event::Metric(metric) = event {
151
152                                    let context = metric.context();
153                                    let new_context = ContextNoOrigin {
154                                        name: context.name().clone(),
155                                        tags: context.tags().clone(),
156                                    };
157
158                                    let timestamp = get_coarse_unix_timestamp();
159                                    let sample = stats.entry(new_context).or_default();
160                                    sample.count += 1;
161                                    sample.last_seen = timestamp;
162
163                            }
164                        }
165                     }},
166                     None => break,
167                },
168                _ = &mut collection_done, if collection_active => {
169                    collection_active = false;
170
171                    // Build the response.
172                    let stats = match current_stats.take() {
173                        Some(stats) => stats,
174                        None => continue,
175                    };
176
177                    let response = StatsResponse::Statistics(CollectedStatistics {
178                        start_time_unix: stats_collection_start_time,
179                        end_time_unix: stats_collection_end_time,
180                        stats: FlattenedStats(stats),
181                    });
182
183                    let response_tx = match stats_response_tx.take() {
184                        Some(tx) => tx,
185                        None => continue,
186                    };
187
188                    // We don't care if we can successfully send back a response or not.
189                    let _ = response_tx.send(response);
190                }
191
192            }
193        }
194        Ok(())
195    }
196}
197
198#[derive(Eq, Hash, PartialEq, Serialize)]
199struct ContextNoOrigin {
200    name: MetaString,
201    tags: TagSet,
202}
203#[derive(Deserialize)]
204struct StatsQueryParams {
205    collection_duration_secs: u64,
206}
207
208impl DogStatsDStatsAPIHandler {
209    async fn stats_handler(
210        State(state): State<DogStatsDStatsAPIHandlerState>, Query(query): Query<StatsQueryParams>,
211    ) -> (StatusCode, String) {
212        const MAXIMUM_COLLECTION_DURATION_SECS: u64 = 600;
213        if query.collection_duration_secs > MAXIMUM_COLLECTION_DURATION_SECS {
214            return (
215                StatusCode::BAD_REQUEST,
216                format!(
217                    "Collection duration cannot be greater than {} seconds.",
218                    MAXIMUM_COLLECTION_DURATION_SECS
219                ),
220            );
221        }
222
223        let (oneshot_tx, oneshot_rx) = oneshot::channel();
224
225        state
226            .tx
227            .send((oneshot_tx, query.collection_duration_secs))
228            .await
229            .unwrap(); // TODO: use config to set collection period
230
231        match oneshot_rx.await {
232            Ok(stats) => match stats {
233                StatsResponse::Statistics(collected_stats) => match serde_json::to_string(&collected_stats) {
234                    Ok(json) => (StatusCode::OK, json),
235                    Err(e) => (
236                        StatusCode::INTERNAL_SERVER_ERROR,
237                        format!("Failed to serialize stats: {}", e),
238                    ),
239                },
240                StatsResponse::AlreadyRunning { try_after } => (
241                    StatusCode::TOO_MANY_REQUESTS,
242                    format!(
243                        "Statistics collection already active. Please try again in {} seconds.",
244                        try_after
245                    ),
246                ),
247            },
248            Err(_) => (
249                StatusCode::INTERNAL_SERVER_ERROR,
250                "Failed to collect statistics.".to_string(),
251            ),
252        }
253    }
254}
255
256impl APIHandler for DogStatsDStatsAPIHandler {
257    type State = DogStatsDStatsAPIHandlerState;
258
259    fn generate_initial_state(&self) -> Self::State {
260        self.state.clone()
261    }
262
263    fn generate_routes(&self) -> Router<Self::State> {
264        Router::new().route("/dogstatsd/stats", get(Self::stats_handler))
265    }
266}
267
268impl DogStatsDStatisticsConfiguration {
269    /// Creates a new `DogStatsDStatisticsConfiguration`.
270    pub fn new() -> Self {
271        let (tx, rx) = mpsc::channel(4);
272        let state = DogStatsDStatsAPIHandlerState { tx: Arc::new(tx) };
273        let handler = DogStatsDStatsAPIHandler { state };
274
275        Self {
276            api_handler: handler,
277            rx: Arc::new(Mutex::new(rx)),
278        }
279    }
280
281    /// Returns an API handler for DogStatsD API.
282    pub fn api_handler(&self) -> DogStatsDStatsAPIHandler {
283        self.api_handler.clone()
284    }
285}
286
287#[async_trait]
288impl DestinationBuilder for DogStatsDStatisticsConfiguration {
289    fn input_event_type(&self) -> EventType {
290        EventType::Metric
291    }
292
293    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Destination + Send>, GenericError> {
294        let rx = self.rx.clone().try_lock_owned()?;
295        Ok(Box::new(DogStatsDStats { rx }))
296    }
297}
298
299impl MemoryBounds for DogStatsDStatisticsConfiguration {
300    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
301        builder
302            .minimum()
303            .with_single_value::<DogStatsDStats>("component struct");
304    }
305}