Skip to main content

saluki_app/metrics/
api.rs

1use std::{sync::Mutex, time::Duration};
2
3use async_trait::async_trait;
4use metrics::Level;
5use saluki_api::{
6    extract::{Query, State},
7    response::IntoResponse,
8    routing::{post, Router},
9    APIHandler, StatusCode,
10};
11use saluki_core::{
12    observability::metrics::FilterHandle,
13    runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
14};
15use saluki_error::generic_error;
16use serde::Deserialize;
17use tokio::{select, sync::mpsc, time::sleep};
18use tracing::info;
19
20const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
21
22#[derive(Deserialize)]
23struct OverrideQueryParams {
24    time_secs: u64,
25}
26
27/// State used for the metrics API handler.
28#[derive(Clone)]
29pub struct MetricsHandlerState {
30    override_tx: mpsc::Sender<Option<(Duration, Level)>>,
31}
32
33/// An API handler for updating metric filtering directives at runtime.
34///
35/// This handler exposes two main routes -- `/metrics/override` and `/metrics/reset` -- which allow for
36/// overriding the default metric filtering directives (configured at startup) at runtime, and then resetting them once
37/// the override is no longer needed.
38///
39/// As this has the potential for incredibly high metrics cardinality at runtime, the override is set with a specific
40/// duration in which it will apply. Once an override has been active for the configured duration, it will automatically
41/// be reset unless the override is refreshed before the duration elapses.
42///
43/// The maximum duration for an override is 60 minutes.
44pub struct MetricsAPIHandler {
45    state: MetricsHandlerState,
46}
47
48impl MetricsAPIHandler {
49    /// Creates a new `MetricsAPIHandler` and a paired [`MetricsOverrideWorker`].
50    ///
51    /// The worker must be added to a [`Supervisor`][saluki_core::runtime::Supervisor] for the handler's
52    /// override/reset routes to take effect; without it, requests are accepted but never applied.
53    pub(super) fn new(filter_handle: FilterHandle) -> (Self, MetricsOverrideWorker) {
54        let (override_tx, override_rx) = mpsc::channel(1);
55        let worker = MetricsOverrideWorker {
56            state: Mutex::new(Some(MetricsOverrideWorkerState {
57                filter_handle,
58                override_rx,
59            })),
60        };
61
62        (
63            Self {
64                state: MetricsHandlerState { override_tx },
65            },
66            worker,
67        )
68    }
69
70    async fn override_handler(
71        State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
72    ) -> impl IntoResponse {
73        // Make sure the override length is within the acceptable range.
74        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
75            return (
76                StatusCode::BAD_REQUEST,
77                format!(
78                    "override time cannot be greater than {} seconds",
79                    MAXIMUM_OVERRIDE_LENGTH_SECS
80                ),
81            );
82        }
83
84        // Parse the override duration and create a new filter from the body.
85        let duration = Duration::from_secs(params.time_secs);
86        let new_level = match Level::try_from(body.as_str()) {
87            Ok(level) => level,
88            Err(e) => {
89                return (
90                    StatusCode::BAD_REQUEST,
91                    format!("failed to parse override filter: {}", e),
92                )
93            }
94        };
95
96        // Instruct the override processor to apply the new metric filtering directives for the given duration.
97        let _ = state.override_tx.send(Some((duration, new_level))).await;
98
99        (StatusCode::OK, "acknowledged".to_string())
100    }
101
102    async fn reset_handler(State(state): State<MetricsHandlerState>) {
103        // Instruct the override processor to immediately reset back to the original metric filtering directives.
104        let _ = state.override_tx.send(None).await;
105    }
106}
107
108impl APIHandler for MetricsAPIHandler {
109    type State = MetricsHandlerState;
110
111    fn generate_initial_state(&self) -> Self::State {
112        self.state.clone()
113    }
114
115    fn generate_routes(&self) -> Router<Self::State> {
116        Router::new()
117            .route("/metrics/override", post(Self::override_handler))
118            .route("/metrics/reset", post(Self::reset_handler))
119    }
120}
121
122/// A worker that processes dynamic metric filter override requests sent via [`MetricsAPIHandler`].
123///
124/// Holds the receiving half of the override channel; the corresponding sender is held by the API handler
125/// (which is itself stored in a static after the metrics subsystem is initialized). The worker exits cleanly
126/// on either supervisor shutdown or channel close.
127///
128/// This worker is one-shot: a successful initialization consumes the receiver. Restart by the supervisor will
129/// fail with an initialization error, propagating up to bring the supervisor down. This matches the historical
130/// fire-and-forget semantics, since the corresponding sender held by [`MetricsAPIHandler`] would no longer have
131/// a live receiver to deliver to.
132pub struct MetricsOverrideWorker {
133    state: Mutex<Option<MetricsOverrideWorkerState>>,
134}
135
136struct MetricsOverrideWorkerState {
137    filter_handle: FilterHandle,
138    override_rx: mpsc::Receiver<Option<(Duration, Level)>>,
139}
140
141#[async_trait]
142impl Supervisable for MetricsOverrideWorker {
143    fn name(&self) -> &str {
144        "dynamic-metrics-override-processor"
145    }
146
147    async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
148        let MetricsOverrideWorkerState {
149            filter_handle,
150            override_rx,
151        } = self
152            .state
153            .lock()
154            .unwrap()
155            .take()
156            .ok_or_else(|| InitializationError::Failed {
157                source: generic_error!("metrics override worker has already been initialized"),
158            })?;
159
160        Ok(Box::pin(async move {
161            process_override_requests(filter_handle, override_rx, process_shutdown).await;
162            Ok(())
163        }))
164    }
165}
166
167async fn process_override_requests(
168    filter_handle: FilterHandle, mut rx: mpsc::Receiver<Option<(Duration, Level)>>,
169    mut process_shutdown: ProcessShutdown,
170) {
171    let mut override_active = false;
172    let override_timeout = sleep(Duration::MAX);
173
174    tokio::pin!(override_timeout);
175
176    let shutdown = process_shutdown.wait_for_shutdown();
177    tokio::pin!(shutdown);
178
179    loop {
180        select! {
181            _ = &mut shutdown => break,
182            maybe_override = rx.recv() => match maybe_override {
183                Some(Some((duration, new_level))) => {
184                    // TODO: Using the `Debug` representation of `Level` is noisy, and we should add a method upstream to
185                    // just get the stringified representation of the level instead.
186                    info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
187
188                    filter_handle.override_filter(new_level);
189
190                    // Mark ourselves as having an active override and update the override timeout.
191                    override_active = true;
192                    override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
193                },
194
195                Some(None) => {
196                    // We've been instructed to immediately reset the filter back to the original one, so simply update
197                    // the override timeout to fire as soon as possible.
198                    override_timeout.as_mut().reset(tokio::time::Instant::now());
199                },
200
201                // Our sender has dropped, so there's no more override requests for us to handle.
202                None => break,
203            },
204            _ = &mut override_timeout => {
205                // Our override timeout has fired. If we have an active override, reset it.
206                //
207                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
208                // reset the timeout with a long duration.
209                if override_active {
210                    override_active = false;
211
212                    filter_handle.reset_filter();
213
214                    info!("Restored original metric filtering directive.");
215                }
216
217                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
218            }
219        }
220    }
221}