saluki_app/logging/
api.rs1use std::{sync::Mutex, time::Duration};
2
3use saluki_api::{
4 extract::{Query, State},
5 response::IntoResponse,
6 routing::{post, Router},
7 APIHandler, StatusCode,
8};
9use saluki_common::task::spawn_traced_named;
10use serde::Deserialize;
11use tokio::{select, sync::mpsc, time::sleep};
12use tracing::{error, info};
13use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
14
15static API_HANDLER: Mutex<Option<LoggingAPIHandler>> = Mutex::new(None);
16
17pub(super) fn set_logging_api_handler(handler: LoggingAPIHandler) {
18 API_HANDLER.lock().unwrap().replace(handler);
19}
20
21pub fn acquire_logging_api_handler() -> Option<LoggingAPIHandler> {
29 API_HANDLER.lock().unwrap().take()
30}
31
32#[derive(Deserialize)]
33struct OverrideQueryParams {
34 time_secs: u64,
35}
36
37#[derive(Clone)]
39pub struct LoggingHandlerState {
40 override_tx: mpsc::Sender<Option<(Duration, EnvFilter)>>,
41}
42
43pub struct LoggingAPIHandler {
55 state: LoggingHandlerState,
56}
57
58impl LoggingAPIHandler {
59 pub(super) fn new(original_filter: EnvFilter, reload_handle: Handle<EnvFilter, Registry>) -> Self {
60 let (override_tx, override_rx) = mpsc::channel(1);
62 spawn_traced_named(
63 "dynamic-logging-override-processor",
64 process_override_requests(original_filter, reload_handle, override_rx),
65 );
66
67 Self {
68 state: LoggingHandlerState { override_tx },
69 }
70 }
71
72 async fn override_handler(
73 State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
74 ) -> impl IntoResponse {
75 const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
77 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
78 return (
79 StatusCode::BAD_REQUEST,
80 format!(
81 "override time cannot be greater than {} seconds",
82 MAXIMUM_OVERRIDE_LENGTH_SECS
83 ),
84 );
85 }
86
87 let duration = Duration::from_secs(params.time_secs);
89 let new_filter = match EnvFilter::try_new(body) {
90 Ok(filter) => filter,
91 Err(e) => {
92 return (
93 StatusCode::BAD_REQUEST,
94 format!("failed to parse override filter: {}", e),
95 )
96 }
97 };
98
99 let _ = state.override_tx.send(Some((duration, new_filter))).await;
101
102 (StatusCode::OK, "acknowledged".to_string())
103 }
104
105 async fn reset_handler(State(state): State<LoggingHandlerState>) {
106 let _ = state.override_tx.send(None).await;
108 }
109}
110
111impl APIHandler for LoggingAPIHandler {
112 type State = LoggingHandlerState;
113
114 fn generate_initial_state(&self) -> Self::State {
115 self.state.clone()
116 }
117
118 fn generate_routes(&self) -> Router<Self::State> {
119 Router::new()
120 .route("/logging/override", post(Self::override_handler))
121 .route("/logging/reset", post(Self::reset_handler))
122 }
123}
124
125async fn process_override_requests(
126 original_filter: EnvFilter, reload_handle: Handle<EnvFilter, Registry>,
127 mut rx: mpsc::Receiver<Option<(Duration, EnvFilter)>>,
128) {
129 let mut override_active = false;
130 let override_timeout = sleep(Duration::from_secs(3600));
131
132 tokio::pin!(override_timeout);
133
134 loop {
135 select! {
136 maybe_override = rx.recv() => match maybe_override {
137 Some(Some((duration, new_filter))) => {
138 info!(directives = %new_filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
139
140 match reload_handle.reload(new_filter) {
141 Ok(()) => {
142 override_active = true;
145 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
146 },
147 Err(e) => error!(error = %e, "Failed to override log filtering directives."),
148 }
149 },
150
151 Some(None) => {
152 override_timeout.as_mut().reset(tokio::time::Instant::now());
155 },
156
157 None => break,
159 },
160 _ = &mut override_timeout => {
161 if override_active {
166 override_active = false;
167
168 if let Err(e) = reload_handle.reload(original_filter.clone()) {
169 error!(error = %e, "Failed to reset log filtering directives.");
170 }
171
172 info!(directives = %original_filter, "Restored original log filtering directives.");
173 }
174
175 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
176 }
177 }
178 }
179
180 if let Err(e) = reload_handle.reload(original_filter) {
182 error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
183 }
184}