Skip to main content

saluki_app/metrics/
api.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use metrics::Level;
5use saluki_api::{
6    extract::{Query, State},
7    response::IntoResponse,
8    routing::{post, Router},
9    APIHandler, DynamicRoute, EndpointType, StatusCode,
10};
11use saluki_core::{
12    observability::metrics::FilterHandle,
13    runtime::{state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
14};
15use saluki_error::generic_error;
16use serde::Deserialize;
17use tokio::{
18    select,
19    sync::{mpsc, Mutex},
20    time::sleep,
21};
22use tracing::info;
23
24const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
25
26#[derive(Deserialize)]
27struct OverrideQueryParams {
28    time_secs: u64,
29}
30
31/// State used for the metrics API handler.
32#[derive(Clone)]
33pub struct MetricsHandlerState {
34    override_tx: mpsc::Sender<Option<(Duration, Level)>>,
35}
36
37/// An API handler for updating metric filtering directives at runtime.
38///
39/// This handler exposes two main routes -- `/metrics/override` and `/metrics/reset` -- which allow for
40/// overriding the default metric filtering directives (configured at startup) at runtime, and then resetting them once
41/// the override is no longer needed.
42///
43/// As this has the potential for incredibly high metrics cardinality at runtime, the override is set with a specific
44/// duration in which it will apply. Once an override has been active for the configured duration, it will automatically
45/// be reset unless the override is refreshed before the duration elapses.
46///
47/// The maximum duration for an override is 60 minutes.
48pub struct MetricsAPIHandler {
49    state: MetricsHandlerState,
50}
51
52impl MetricsAPIHandler {
53    async fn override_handler(
54        State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
55    ) -> impl IntoResponse {
56        // Make sure the override length is within the acceptable range.
57        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
58            return (
59                StatusCode::BAD_REQUEST,
60                format!(
61                    "override time cannot be greater than {} seconds",
62                    MAXIMUM_OVERRIDE_LENGTH_SECS
63                ),
64            );
65        }
66
67        // Parse the override duration and create a new filter from the body.
68        let duration = Duration::from_secs(params.time_secs);
69        let new_level = match Level::try_from(body.as_str()) {
70            Ok(level) => level,
71            Err(e) => {
72                return (
73                    StatusCode::BAD_REQUEST,
74                    format!("failed to parse override filter: {}", e),
75                )
76            }
77        };
78
79        // Instruct the override processor to apply the new metric filtering directives for the given duration.
80        let _ = state.override_tx.send(Some((duration, new_level))).await;
81
82        (StatusCode::OK, "acknowledged".to_string())
83    }
84
85    async fn reset_handler(State(state): State<MetricsHandlerState>) {
86        // Instruct the override processor to immediately reset back to the original metric filtering directives.
87        let _ = state.override_tx.send(None).await;
88    }
89}
90
91impl APIHandler for MetricsAPIHandler {
92    type State = MetricsHandlerState;
93
94    fn generate_initial_state(&self) -> Self::State {
95        self.state.clone()
96    }
97
98    fn generate_routes(&self) -> Router<Self::State> {
99        Router::new()
100            .route("/metrics/override", post(Self::override_handler))
101            .route("/metrics/reset", post(Self::reset_handler))
102    }
103}
104
105/// A worker that processes dynamic metric filter override requests.
106///
107/// When running, the worker asserts a set of routes (based on [`MetricsAPIHandler`]) that allow triggering
108/// an override of the current metrics filter directives as well as clearing (resetting) the active override.
109pub struct MetricsOverrideWorker {
110    handler: MetricsAPIHandler,
111    state: Arc<Mutex<MetricsOverrideWorkerState>>,
112}
113
114struct MetricsOverrideWorkerState {
115    filter_handle: FilterHandle,
116    override_rx: mpsc::Receiver<Option<(Duration, Level)>>,
117}
118
119impl MetricsOverrideWorker {
120    /// Creates a new `MetricsOverrideWorker` driving the given filter handle.
121    pub(super) fn new(filter_handle: FilterHandle) -> Self {
122        let (override_tx, override_rx) = mpsc::channel(1);
123        let handler = MetricsAPIHandler {
124            state: MetricsHandlerState { override_tx },
125        };
126        Self {
127            handler,
128            state: Arc::new(Mutex::new(MetricsOverrideWorkerState {
129                filter_handle,
130                override_rx,
131            })),
132        }
133    }
134}
135
136#[async_trait]
137impl Supervisable for MetricsOverrideWorker {
138    fn name(&self) -> &str {
139        "dynamic-metrics-override-processor"
140    }
141
142    async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
143        let mut state = self.state.clone().lock_owned().await;
144        let metrics_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
145
146        Ok(Box::pin(async move {
147            DataspaceRegistry::try_current()
148                .ok_or_else(|| generic_error!("Dataspace not available."))?
149                .assert(metrics_route, "metrics-api");
150
151            process_override_requests(&mut state, process_shutdown).await;
152            Ok(())
153        }))
154    }
155}
156
157async fn process_override_requests(state: &mut MetricsOverrideWorkerState, mut process_shutdown: ProcessShutdown) {
158    let mut override_active = false;
159    let override_timeout = sleep(Duration::MAX);
160
161    tokio::pin!(override_timeout);
162
163    let shutdown = process_shutdown.wait_for_shutdown();
164    tokio::pin!(shutdown);
165
166    loop {
167        select! {
168            _ = &mut shutdown => break,
169            maybe_override = state.override_rx.recv() => match maybe_override {
170                Some(Some((duration, new_level))) => {
171                    // TODO: Using the `Debug` representation of `Level` is noisy, and we should add a method upstream to
172                    // just get the stringified representation of the level instead.
173                    info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
174
175                    state.filter_handle.override_filter(new_level);
176
177                    // Mark ourselves as having an active override and update the override timeout.
178                    override_active = true;
179                    override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
180                },
181
182                Some(None) => {
183                    // We've been instructed to immediately reset the filter back to the original one, so simply update
184                    // the override timeout to fire as soon as possible.
185                    override_timeout.as_mut().reset(tokio::time::Instant::now());
186                },
187
188                // Our sender has dropped, so there's no more override requests for us to handle.
189                None => break,
190            },
191            _ = &mut override_timeout => {
192                // Our override timeout has fired. If we have an active override, reset it.
193                //
194                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
195                // reset the timeout with a long duration.
196                if override_active {
197                    override_active = false;
198
199                    state.filter_handle.reset_filter();
200
201                    info!("Restored original metric filtering directive.");
202                }
203
204                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
205            }
206        }
207    }
208}