Skip to main content

saluki_app/metrics/
api.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use metrics::Level;
5use saluki_api::{
6    extract::{Query, State},
7    response::IntoResponse,
8    routing::{post, Router},
9    APIHandler, DynamicRoute, EndpointType, StatusCode,
10};
11use saluki_common::sync::shutdown::ShutdownHandle;
12use saluki_core::{
13    observability::metrics::FilterHandle,
14    runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture},
15};
16use saluki_error::generic_error;
17use serde::Deserialize;
18use tokio::{
19    pin, select,
20    sync::{mpsc, Mutex},
21    time::sleep,
22};
23use tracing::info;
24
25const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
26
27#[derive(Deserialize)]
28struct OverrideQueryParams {
29    time_secs: u64,
30}
31
32/// State used for the metrics API handler.
33#[derive(Clone)]
34pub struct MetricsHandlerState {
35    override_tx: mpsc::Sender<Option<(Duration, Level)>>,
36}
37
38/// An API handler for updating metric filtering directives at runtime.
39///
40/// This handler exposes two main routes -- `/metrics/override` and `/metrics/reset` -- which allow for
41/// overriding the default metric filtering directives (configured at startup) at runtime, and then resetting them once
42/// the override is no longer needed.
43///
44/// As this has the potential for incredibly high metrics cardinality at runtime, the override is set with a specific
45/// duration in which it will apply. Once an override has been active for the configured duration, it will automatically
46/// be reset unless the override is refreshed before the duration elapses.
47///
48/// The maximum duration for an override is 60 minutes.
49pub struct MetricsAPIHandler {
50    state: MetricsHandlerState,
51}
52
53impl MetricsAPIHandler {
54    async fn override_handler(
55        State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
56    ) -> impl IntoResponse {
57        // Make sure the override length is within the acceptable range.
58        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
59            return (
60                StatusCode::BAD_REQUEST,
61                format!(
62                    "override time cannot be greater than {} seconds",
63                    MAXIMUM_OVERRIDE_LENGTH_SECS
64                ),
65            );
66        }
67
68        // Parse the override duration and create a new filter from the body.
69        let duration = Duration::from_secs(params.time_secs);
70        let new_level = match Level::try_from(body.as_str()) {
71            Ok(level) => level,
72            Err(e) => {
73                return (
74                    StatusCode::BAD_REQUEST,
75                    format!("failed to parse override filter: {}", e),
76                )
77            }
78        };
79
80        // Instruct the override processor to apply the new metric filtering directives for the given duration.
81        let _ = state.override_tx.send(Some((duration, new_level))).await;
82
83        (StatusCode::OK, "acknowledged".to_string())
84    }
85
86    async fn reset_handler(State(state): State<MetricsHandlerState>) {
87        // Instruct the override processor to immediately reset back to the original metric filtering directives.
88        let _ = state.override_tx.send(None).await;
89    }
90}
91
92impl APIHandler for MetricsAPIHandler {
93    type State = MetricsHandlerState;
94
95    fn generate_initial_state(&self) -> Self::State {
96        self.state.clone()
97    }
98
99    fn generate_routes(&self) -> Router<Self::State> {
100        Router::new()
101            .route("/metrics/override", post(Self::override_handler))
102            .route("/metrics/reset", post(Self::reset_handler))
103    }
104}
105
106/// A worker that processes dynamic metric filter override requests.
107///
108/// When running, the worker asserts a set of routes (based on [`MetricsAPIHandler`]) that allow triggering
109/// an override of the current metrics filter directives as well as clearing (resetting) the active override.
110pub struct MetricsOverrideWorker {
111    handler: MetricsAPIHandler,
112    state: Arc<Mutex<MetricsOverrideWorkerState>>,
113}
114
115struct MetricsOverrideWorkerState {
116    filter_handle: FilterHandle,
117    override_rx: mpsc::Receiver<Option<(Duration, Level)>>,
118}
119
120impl MetricsOverrideWorker {
121    /// Creates a new `MetricsOverrideWorker` driving the given filter handle.
122    pub(super) fn new(filter_handle: FilterHandle) -> Self {
123        let (override_tx, override_rx) = mpsc::channel(1);
124        let handler = MetricsAPIHandler {
125            state: MetricsHandlerState { override_tx },
126        };
127        Self {
128            handler,
129            state: Arc::new(Mutex::new(MetricsOverrideWorkerState {
130                filter_handle,
131                override_rx,
132            })),
133        }
134    }
135}
136
137#[async_trait]
138impl Supervisable for MetricsOverrideWorker {
139    fn name(&self) -> &str {
140        "dynamic-metrics-override-processor"
141    }
142
143    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
144        let mut state = self.state.clone().lock_owned().await;
145        let metrics_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
146
147        Ok(Box::pin(async move {
148            DataspaceRegistry::try_current()
149                .ok_or_else(|| generic_error!("Dataspace not available."))?
150                .assert(metrics_route, "metrics-api");
151
152            process_override_requests(&mut state, process_shutdown).await;
153
154            Ok(())
155        }))
156    }
157}
158
159async fn process_override_requests(state: &mut MetricsOverrideWorkerState, process_shutdown: ShutdownHandle) {
160    let mut override_active = false;
161    let override_timeout = sleep(Duration::MAX);
162
163    pin!(override_timeout, process_shutdown);
164
165    loop {
166        select! {
167            _ = &mut process_shutdown => break,
168            maybe_override = state.override_rx.recv() => match maybe_override {
169                Some(Some((duration, new_level))) => {
170                    // TODO: Using the `Debug` representation of `Level` is noisy, and we should add a method upstream to
171                    // just get the stringified representation of the level instead.
172                    info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
173
174                    state.filter_handle.override_filter(new_level);
175
176                    // Mark ourselves as having an active override and update the override timeout.
177                    override_active = true;
178                    override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
179                },
180
181                Some(None) => {
182                    // We've been instructed to immediately reset the filter back to the original one, so simply update
183                    // the override timeout to fire as soon as possible.
184                    override_timeout.as_mut().reset(tokio::time::Instant::now());
185                },
186
187                // Our sender has dropped, so there's no more override requests for us to handle.
188                None => break,
189            },
190            _ = &mut override_timeout => {
191                // Our override timeout has fired. If we have an active override, reset it.
192                //
193                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
194                // reset the timeout with a long duration.
195                if override_active {
196                    override_active = false;
197
198                    state.filter_handle.reset_filter();
199
200                    info!("Restored original metric filtering directive.");
201                }
202
203                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
204            }
205        }
206    }
207}