saluki_app/metrics/
api.rs1use std::{sync::Mutex, time::Duration};
2
3use async_trait::async_trait;
4use metrics::Level;
5use saluki_api::{
6 extract::{Query, State},
7 response::IntoResponse,
8 routing::{post, Router},
9 APIHandler, StatusCode,
10};
11use saluki_core::{
12 observability::metrics::FilterHandle,
13 runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
14};
15use saluki_error::generic_error;
16use serde::Deserialize;
17use tokio::{select, sync::mpsc, time::sleep};
18use tracing::info;
19
20const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
21
22#[derive(Deserialize)]
23struct OverrideQueryParams {
24 time_secs: u64,
25}
26
27#[derive(Clone)]
29pub struct MetricsHandlerState {
30 override_tx: mpsc::Sender<Option<(Duration, Level)>>,
31}
32
33pub struct MetricsAPIHandler {
45 state: MetricsHandlerState,
46}
47
48impl MetricsAPIHandler {
49 pub(super) fn new(filter_handle: FilterHandle) -> (Self, MetricsOverrideWorker) {
54 let (override_tx, override_rx) = mpsc::channel(1);
55 let worker = MetricsOverrideWorker {
56 state: Mutex::new(Some(MetricsOverrideWorkerState {
57 filter_handle,
58 override_rx,
59 })),
60 };
61
62 (
63 Self {
64 state: MetricsHandlerState { override_tx },
65 },
66 worker,
67 )
68 }
69
70 async fn override_handler(
71 State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
72 ) -> impl IntoResponse {
73 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
75 return (
76 StatusCode::BAD_REQUEST,
77 format!(
78 "override time cannot be greater than {} seconds",
79 MAXIMUM_OVERRIDE_LENGTH_SECS
80 ),
81 );
82 }
83
84 let duration = Duration::from_secs(params.time_secs);
86 let new_level = match Level::try_from(body.as_str()) {
87 Ok(level) => level,
88 Err(e) => {
89 return (
90 StatusCode::BAD_REQUEST,
91 format!("failed to parse override filter: {}", e),
92 )
93 }
94 };
95
96 let _ = state.override_tx.send(Some((duration, new_level))).await;
98
99 (StatusCode::OK, "acknowledged".to_string())
100 }
101
102 async fn reset_handler(State(state): State<MetricsHandlerState>) {
103 let _ = state.override_tx.send(None).await;
105 }
106}
107
108impl APIHandler for MetricsAPIHandler {
109 type State = MetricsHandlerState;
110
111 fn generate_initial_state(&self) -> Self::State {
112 self.state.clone()
113 }
114
115 fn generate_routes(&self) -> Router<Self::State> {
116 Router::new()
117 .route("/metrics/override", post(Self::override_handler))
118 .route("/metrics/reset", post(Self::reset_handler))
119 }
120}
121
122pub struct MetricsOverrideWorker {
133 state: Mutex<Option<MetricsOverrideWorkerState>>,
134}
135
136struct MetricsOverrideWorkerState {
137 filter_handle: FilterHandle,
138 override_rx: mpsc::Receiver<Option<(Duration, Level)>>,
139}
140
141#[async_trait]
142impl Supervisable for MetricsOverrideWorker {
143 fn name(&self) -> &str {
144 "dynamic-metrics-override-processor"
145 }
146
147 async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
148 let MetricsOverrideWorkerState {
149 filter_handle,
150 override_rx,
151 } = self
152 .state
153 .lock()
154 .unwrap()
155 .take()
156 .ok_or_else(|| InitializationError::Failed {
157 source: generic_error!("metrics override worker has already been initialized"),
158 })?;
159
160 Ok(Box::pin(async move {
161 process_override_requests(filter_handle, override_rx, process_shutdown).await;
162 Ok(())
163 }))
164 }
165}
166
167async fn process_override_requests(
168 filter_handle: FilterHandle, mut rx: mpsc::Receiver<Option<(Duration, Level)>>,
169 mut process_shutdown: ProcessShutdown,
170) {
171 let mut override_active = false;
172 let override_timeout = sleep(Duration::MAX);
173
174 tokio::pin!(override_timeout);
175
176 let shutdown = process_shutdown.wait_for_shutdown();
177 tokio::pin!(shutdown);
178
179 loop {
180 select! {
181 _ = &mut shutdown => break,
182 maybe_override = rx.recv() => match maybe_override {
183 Some(Some((duration, new_level))) => {
184 info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
187
188 filter_handle.override_filter(new_level);
189
190 override_active = true;
192 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
193 },
194
195 Some(None) => {
196 override_timeout.as_mut().reset(tokio::time::Instant::now());
199 },
200
201 None => break,
203 },
204 _ = &mut override_timeout => {
205 if override_active {
210 override_active = false;
211
212 filter_handle.reset_filter();
213
214 info!("Restored original metric filtering directive.");
215 }
216
217 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
218 }
219 }
220 }
221}