saluki_app/metrics/
api.rs1use std::time::Duration;
2
3use metrics::Level;
4use saluki_api::{
5 extract::{Query, State},
6 response::IntoResponse,
7 routing::{post, Router},
8 APIHandler, StatusCode,
9};
10use saluki_common::task::spawn_traced_named;
11use saluki_core::observability::metrics::FilterHandle;
12use serde::Deserialize;
13use tokio::{select, sync::mpsc, time::sleep};
14use tracing::info;
15
16const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 60 * 60;
17
18#[derive(Deserialize)]
19struct OverrideQueryParams {
20 time_secs: u64,
21}
22
23#[derive(Clone)]
25pub struct MetricsHandlerState {
26 override_tx: mpsc::Sender<Option<(Duration, Level)>>,
27}
28
29pub struct MetricsAPIHandler {
41 state: MetricsHandlerState,
42}
43
44impl MetricsAPIHandler {
45 pub(super) fn new(filter_handle: FilterHandle) -> Self {
46 let (override_tx, override_rx) = mpsc::channel(1);
48 spawn_traced_named(
49 "dynamic-metrics-override-processor",
50 process_override_requests(filter_handle, override_rx),
51 );
52
53 Self {
54 state: MetricsHandlerState { override_tx },
55 }
56 }
57
58 async fn override_handler(
59 State(state): State<MetricsHandlerState>, params: Query<OverrideQueryParams>, body: String,
60 ) -> impl IntoResponse {
61 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
63 return (
64 StatusCode::BAD_REQUEST,
65 format!(
66 "override time cannot be greater than {} seconds",
67 MAXIMUM_OVERRIDE_LENGTH_SECS
68 ),
69 );
70 }
71
72 let duration = Duration::from_secs(params.time_secs);
74 let new_level = match Level::try_from(body.as_str()) {
75 Ok(level) => level,
76 Err(e) => {
77 return (
78 StatusCode::BAD_REQUEST,
79 format!("failed to parse override filter: {}", e),
80 )
81 }
82 };
83
84 let _ = state.override_tx.send(Some((duration, new_level))).await;
86
87 (StatusCode::OK, "acknowledged".to_string())
88 }
89
90 async fn reset_handler(State(state): State<MetricsHandlerState>) {
91 let _ = state.override_tx.send(None).await;
93 }
94}
95
96impl APIHandler for MetricsAPIHandler {
97 type State = MetricsHandlerState;
98
99 fn generate_initial_state(&self) -> Self::State {
100 self.state.clone()
101 }
102
103 fn generate_routes(&self) -> Router<Self::State> {
104 Router::new()
105 .route("/metrics/override", post(Self::override_handler))
106 .route("/metrics/reset", post(Self::reset_handler))
107 }
108}
109
110async fn process_override_requests(filter_handle: FilterHandle, mut rx: mpsc::Receiver<Option<(Duration, Level)>>) {
111 let mut override_active = false;
112 let override_timeout = sleep(Duration::MAX);
113
114 tokio::pin!(override_timeout);
115
116 loop {
117 select! {
118 maybe_override = rx.recv() => match maybe_override {
119 Some(Some((duration, new_level))) => {
120 info!(level = ?new_level, "Overriding existing metric filtering directive for {} seconds...", duration.as_secs());
123
124 filter_handle.override_filter(new_level);
125
126 override_active = true;
128 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
129 },
130
131 Some(None) => {
132 override_timeout.as_mut().reset(tokio::time::Instant::now());
135 },
136
137 None => break,
139 },
140 _ = &mut override_timeout => {
141 if override_active {
146 override_active = false;
147
148 filter_handle.reset_filter();
149
150 info!("Restored original metric filtering directive.");
151 }
152
153 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(MAXIMUM_OVERRIDE_LENGTH_SECS));
154 }
155 }
156 }
157}