saluki_app/metrics/
api.rs1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use metrics::Level;
5use saluki_api::{
6 extract::{Query, State},
7 response::IntoResponse,
8 routing::{post, Router},
9 APIHandler, DynamicRoute, EndpointType, StatusCode,
10};
11use saluki_core::{
12 observability::metrics::FilterHandle,
13 runtime::{state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
14};
15use saluki_error::generic_error;
16use serde::Deserialize;
17use tokio::{
18 select,
19 sync::{mpsc, Mutex},
20 time::sleep,
21};
22use tracing::info;
23
24const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
25
26#[derive(Deserialize)]
27struct OverrideQueryParams {
28 time_secs: u64,
29}
30
31#[derive(Clone)]
33pub struct MetricsHandlerState {
34 override_tx: mpsc::Sender<Option<(Duration, Level)>>,
35}
36
37pub struct MetricsAPIHandler {
49 state: MetricsHandlerState,
50}
51
52impl MetricsAPIHandler {
53 async fn override_handler(
54 State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
55 ) -> impl IntoResponse {
56 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
58 return (
59 StatusCode::BAD_REQUEST,
60 format!(
61 "override time cannot be greater than {} seconds",
62 MAXIMUM_OVERRIDE_LENGTH_SECS
63 ),
64 );
65 }
66
67 let duration = Duration::from_secs(params.time_secs);
69 let new_level = match Level::try_from(body.as_str()) {
70 Ok(level) => level,
71 Err(e) => {
72 return (
73 StatusCode::BAD_REQUEST,
74 format!("failed to parse override filter: {}", e),
75 )
76 }
77 };
78
79 let _ = state.override_tx.send(Some((duration, new_level))).await;
81
82 (StatusCode::OK, "acknowledged".to_string())
83 }
84
85 async fn reset_handler(State(state): State<MetricsHandlerState>) {
86 let _ = state.override_tx.send(None).await;
88 }
89}
90
91impl APIHandler for MetricsAPIHandler {
92 type State = MetricsHandlerState;
93
94 fn generate_initial_state(&self) -> Self::State {
95 self.state.clone()
96 }
97
98 fn generate_routes(&self) -> Router<Self::State> {
99 Router::new()
100 .route("/metrics/override", post(Self::override_handler))
101 .route("/metrics/reset", post(Self::reset_handler))
102 }
103}
104
105pub struct MetricsOverrideWorker {
110 handler: MetricsAPIHandler,
111 state: Arc<Mutex<MetricsOverrideWorkerState>>,
112}
113
114struct MetricsOverrideWorkerState {
115 filter_handle: FilterHandle,
116 override_rx: mpsc::Receiver<Option<(Duration, Level)>>,
117}
118
119impl MetricsOverrideWorker {
120 pub(super) fn new(filter_handle: FilterHandle) -> Self {
122 let (override_tx, override_rx) = mpsc::channel(1);
123 let handler = MetricsAPIHandler {
124 state: MetricsHandlerState { override_tx },
125 };
126 Self {
127 handler,
128 state: Arc::new(Mutex::new(MetricsOverrideWorkerState {
129 filter_handle,
130 override_rx,
131 })),
132 }
133 }
134}
135
136#[async_trait]
137impl Supervisable for MetricsOverrideWorker {
138 fn name(&self) -> &str {
139 "dynamic-metrics-override-processor"
140 }
141
142 async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
143 let mut state = self.state.clone().lock_owned().await;
144 let metrics_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
145
146 Ok(Box::pin(async move {
147 DataspaceRegistry::try_current()
148 .ok_or_else(|| generic_error!("Dataspace not available."))?
149 .assert(metrics_route, "metrics-api");
150
151 process_override_requests(&mut state, process_shutdown).await;
152 Ok(())
153 }))
154 }
155}
156
157async fn process_override_requests(state: &mut MetricsOverrideWorkerState, mut process_shutdown: ProcessShutdown) {
158 let mut override_active = false;
159 let override_timeout = sleep(Duration::MAX);
160
161 tokio::pin!(override_timeout);
162
163 let shutdown = process_shutdown.wait_for_shutdown();
164 tokio::pin!(shutdown);
165
166 loop {
167 select! {
168 _ = &mut shutdown => break,
169 maybe_override = state.override_rx.recv() => match maybe_override {
170 Some(Some((duration, new_level))) => {
171 info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
174
175 state.filter_handle.override_filter(new_level);
176
177 override_active = true;
179 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
180 },
181
182 Some(None) => {
183 override_timeout.as_mut().reset(tokio::time::Instant::now());
186 },
187
188 None => break,
190 },
191 _ = &mut override_timeout => {
192 if override_active {
197 override_active = false;
198
199 state.filter_handle.reset_filter();
200
201 info!("Restored original metric filtering directive.");
202 }
203
204 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
205 }
206 }
207 }
208}