saluki_app/metrics/
api.rs1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use metrics::Level;
5use saluki_api::{
6 extract::{Query, State},
7 response::IntoResponse,
8 routing::{post, Router},
9 APIHandler, DynamicRoute, EndpointType, StatusCode,
10};
11use saluki_common::sync::shutdown::ShutdownHandle;
12use saluki_core::{
13 observability::metrics::FilterHandle,
14 runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture},
15};
16use saluki_error::generic_error;
17use serde::Deserialize;
18use tokio::{
19 pin, select,
20 sync::{mpsc, Mutex},
21 time::sleep,
22};
23use tracing::info;
24
25const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
26
27#[derive(Deserialize)]
28struct OverrideQueryParams {
29 time_secs: u64,
30}
31
32#[derive(Clone)]
34pub struct MetricsHandlerState {
35 override_tx: mpsc::Sender<Option<(Duration, Level)>>,
36}
37
38pub struct MetricsAPIHandler {
50 state: MetricsHandlerState,
51}
52
53impl MetricsAPIHandler {
54 async fn override_handler(
55 State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
56 ) -> impl IntoResponse {
57 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
59 return (
60 StatusCode::BAD_REQUEST,
61 format!(
62 "override time cannot be greater than {} seconds",
63 MAXIMUM_OVERRIDE_LENGTH_SECS
64 ),
65 );
66 }
67
68 let duration = Duration::from_secs(params.time_secs);
70 let new_level = match Level::try_from(body.as_str()) {
71 Ok(level) => level,
72 Err(e) => {
73 return (
74 StatusCode::BAD_REQUEST,
75 format!("failed to parse override filter: {}", e),
76 )
77 }
78 };
79
80 let _ = state.override_tx.send(Some((duration, new_level))).await;
82
83 (StatusCode::OK, "acknowledged".to_string())
84 }
85
86 async fn reset_handler(State(state): State<MetricsHandlerState>) {
87 let _ = state.override_tx.send(None).await;
89 }
90}
91
92impl APIHandler for MetricsAPIHandler {
93 type State = MetricsHandlerState;
94
95 fn generate_initial_state(&self) -> Self::State {
96 self.state.clone()
97 }
98
99 fn generate_routes(&self) -> Router<Self::State> {
100 Router::new()
101 .route("/metrics/override", post(Self::override_handler))
102 .route("/metrics/reset", post(Self::reset_handler))
103 }
104}
105
106pub struct MetricsOverrideWorker {
111 handler: MetricsAPIHandler,
112 state: Arc<Mutex<MetricsOverrideWorkerState>>,
113}
114
115struct MetricsOverrideWorkerState {
116 filter_handle: FilterHandle,
117 override_rx: mpsc::Receiver<Option<(Duration, Level)>>,
118}
119
120impl MetricsOverrideWorker {
121 pub(super) fn new(filter_handle: FilterHandle) -> Self {
123 let (override_tx, override_rx) = mpsc::channel(1);
124 let handler = MetricsAPIHandler {
125 state: MetricsHandlerState { override_tx },
126 };
127 Self {
128 handler,
129 state: Arc::new(Mutex::new(MetricsOverrideWorkerState {
130 filter_handle,
131 override_rx,
132 })),
133 }
134 }
135}
136
137#[async_trait]
138impl Supervisable for MetricsOverrideWorker {
139 fn name(&self) -> &str {
140 "dynamic-metrics-override-processor"
141 }
142
143 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
144 let mut state = self.state.clone().lock_owned().await;
145 let metrics_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
146
147 Ok(Box::pin(async move {
148 DataspaceRegistry::try_current()
149 .ok_or_else(|| generic_error!("Dataspace not available."))?
150 .assert(metrics_route, "metrics-api");
151
152 process_override_requests(&mut state, process_shutdown).await;
153
154 Ok(())
155 }))
156 }
157}
158
159async fn process_override_requests(state: &mut MetricsOverrideWorkerState, process_shutdown: ShutdownHandle) {
160 let mut override_active = false;
161 let override_timeout = sleep(Duration::MAX);
162
163 pin!(override_timeout, process_shutdown);
164
165 loop {
166 select! {
167 _ = &mut process_shutdown => break,
168 maybe_override = state.override_rx.recv() => match maybe_override {
169 Some(Some((duration, new_level))) => {
170 info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
173
174 state.filter_handle.override_filter(new_level);
175
176 override_active = true;
178 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
179 },
180
181 Some(None) => {
182 override_timeout.as_mut().reset(tokio::time::Instant::now());
185 },
186
187 None => break,
189 },
190 _ = &mut override_timeout => {
191 if override_active {
196 override_active = false;
197
198 state.filter_handle.reset_filter();
199
200 info!("Restored original metric filtering directive.");
201 }
202
203 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
204 }
205 }
206 }
207}