saluki_app/metrics/
api.rs

1use std::time::Duration;
2
3use metrics::Level;
4use saluki_api::{
5    extract::{Query, State},
6    response::IntoResponse,
7    routing::{post, Router},
8    APIHandler, StatusCode,
9};
10use saluki_common::task::spawn_traced_named;
11use saluki_core::observability::metrics::FilterHandle;
12use serde::Deserialize;
13use tokio::{select, sync::mpsc, time::sleep};
14use tracing::info;
15
16const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
17
18#[derive(Deserialize)]
19struct OverrideQueryParams {
20    time_secs: u64,
21}
22
23/// State used for the metrics API handler.
24#[derive(Clone)]
25pub struct MetricsHandlerState {
26    override_tx: mpsc::Sender<Option<(Duration, Level)>>,
27}
28
29/// An API handler for updating metric filtering directives at runtime.
30///
31/// This handler exposes two main routes -- `/metrics/override` and `/metrics/reset` -- which allow for
32/// overriding the default metric filtering directives (configured at startup) at runtime, and then resetting them once
33/// the override is no longer needed.
34///
35/// As this has the potential for incredibly high metrics cardinality at runtime, the override is set with a specific
36/// duration in which it will apply. Once an override has been active for the configured duration, it will automatically
37/// be reset unless the override is refreshed before the duration elapses.
38///
39/// The maximum duration for an override is 60 minutes.
40pub struct MetricsAPIHandler {
41    state: MetricsHandlerState,
42}
43
44impl MetricsAPIHandler {
45    pub(super) fn new(filter_handle: FilterHandle) -> Self {
46        // Spawn our background task that will handle override requests.
47        let (override_tx, override_rx) = mpsc::channel(1);
48        spawn_traced_named(
49            "dynamic-metrics-override-processor",
50            process_override_requests(filter_handle, override_rx),
51        );
52
53        Self {
54            state: MetricsHandlerState { override_tx },
55        }
56    }
57
58    async fn override_handler(
59        State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
60    ) -> impl IntoResponse {
61        // Make sure the override length is within the acceptable range.
62        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
63            return (
64                StatusCode::BAD_REQUEST,
65                format!(
66                    "override time cannot be greater than {} seconds",
67                    MAXIMUM_OVERRIDE_LENGTH_SECS
68                ),
69            );
70        }
71
72        // Parse the override duration and create a new filter from the body.
73        let duration = Duration::from_secs(params.time_secs);
74        let new_level = match Level::try_from(body.as_str()) {
75            Ok(level) => level,
76            Err(e) => {
77                return (
78                    StatusCode::BAD_REQUEST,
79                    format!("failed to parse override filter: {}", e),
80                )
81            }
82        };
83
84        // Instruct the override processor to apply the new metric filtering directives for the given duration.
85        let _ = state.override_tx.send(Some((duration, new_level))).await;
86
87        (StatusCode::OK, "acknowledged".to_string())
88    }
89
90    async fn reset_handler(State(state): State<MetricsHandlerState>) {
91        // Instruct the override processor to immediately reset back to the original metric filtering directives.
92        let _ = state.override_tx.send(None).await;
93    }
94}
95
96impl APIHandler for MetricsAPIHandler {
97    type State = MetricsHandlerState;
98
99    fn generate_initial_state(&self) -> Self::State {
100        self.state.clone()
101    }
102
103    fn generate_routes(&self) -> Router<Self::State> {
104        Router::new()
105            .route("/metrics/override", post(Self::override_handler))
106            .route("/metrics/reset", post(Self::reset_handler))
107    }
108}
109
110async fn process_override_requests(filter_handle: FilterHandle, mut rx: mpsc::Receiver<Option<(Duration, Level)>>) {
111    let mut override_active = false;
112    let override_timeout = sleep(Duration::MAX);
113
114    tokio::pin!(override_timeout);
115
116    loop {
117        select! {
118            maybe_override = rx.recv() => match maybe_override {
119                Some(Some((duration, new_level))) => {
120                    // TODO: Using the `Debug` representation of `Level` is noisy, and we should add a method upstream to
121                    // just get the stringified representation of the level instead.
122                    info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
123
124                    filter_handle.override_filter(new_level);
125
126                    // Mark ourselves as having an active override and update the override timeout.
127                    override_active = true;
128                    override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
129                },
130
131                Some(None) => {
132                    // We've been instructed to immediately reset the filter back to the original one, so simply update
133                    // the override timeout to fire as soon as possible.
134                    override_timeout.as_mut().reset(tokio::time::Instant::now());
135                },
136
137                // Our sender has dropped, so there's no more override requests for us to handle.
138                None => break,
139            },
140            _ = &mut override_timeout => {
141                // Our override timeout has fired. If we have an active override, reset it.
142                //
143                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
144                // reset the timeout with a long duration.
145                if override_active {
146                    override_active = false;
147
148                    filter_handle.reset_filter();
149
150                    info!("Restored original metric filtering directive.");
151                }
152
153                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
154            }
155        }
156    }
157}