1use std::{sync::Mutex, time::Duration};
2
3use async_trait::async_trait;
4use saluki_api::{
5 extract::{Query, State},
6 response::IntoResponse,
7 routing::{post, Router},
8 APIHandler, StatusCode,
9};
10use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture};
11use saluki_error::{generic_error, GenericError};
12use serde::Deserialize;
13use tokio::{select, sync::mpsc, time::sleep};
14use tracing::{error, info};
15use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
16
17static API_HANDLER: Mutex<Option<LoggingAPIHandler>> = Mutex::new(None);
18
19pub(super) fn set_logging_api_handler(handler: LoggingAPIHandler) {
20 API_HANDLER.lock().unwrap().replace(handler);
21}
22
23pub fn acquire_logging_api_handler() -> Option<LoggingAPIHandler> {
31 API_HANDLER.lock().unwrap().take()
32}
33
34#[derive(Deserialize)]
35struct OverrideQueryParams {
36 time_secs: u64,
37}
38
39enum LoggingOverrideAction {
41 Override { duration: Duration, filter: EnvFilter },
46
47 Reset,
49
50 UpdateBase(EnvFilter),
55}
56
57#[derive(Clone)]
63pub struct LoggingOverrideController {
64 tx: mpsc::Sender<LoggingOverrideAction>,
65}
66
67impl LoggingOverrideController {
68 pub(crate) async fn override_for(&self, duration: Duration, filter: EnvFilter) -> Result<(), GenericError> {
70 self.send(LoggingOverrideAction::Override { duration, filter }).await
71 }
72
73 pub(crate) async fn reset(&self) -> Result<(), GenericError> {
75 self.send(LoggingOverrideAction::Reset).await
76 }
77
78 pub async fn update_base(&self, filter: EnvFilter) -> Result<(), GenericError> {
82 self.send(LoggingOverrideAction::UpdateBase(filter)).await
83 }
84
85 async fn send(&self, action: LoggingOverrideAction) -> Result<(), GenericError> {
86 self.tx
87 .send(action)
88 .await
89 .map_err(|_| generic_error!("logging override worker is no longer running"))
90 }
91}
92
93#[derive(Clone)]
95pub struct LoggingHandlerState {
96 controller: LoggingOverrideController,
97}
98
99pub struct LoggingAPIHandler {
111 state: LoggingHandlerState,
112}
113
114impl LoggingAPIHandler {
115 pub(super) fn new(controller: LoggingOverrideController) -> Self {
117 Self {
118 state: LoggingHandlerState { controller },
119 }
120 }
121
122 async fn override_handler(
123 State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
124 ) -> impl IntoResponse {
125 const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
127 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
128 return (
129 StatusCode::BAD_REQUEST,
130 format!(
131 "override time cannot be greater than {} seconds",
132 MAXIMUM_OVERRIDE_LENGTH_SECS
133 ),
134 );
135 }
136
137 let duration = Duration::from_secs(params.time_secs);
139 let new_filter = match EnvFilter::try_new(body) {
140 Ok(filter) => filter,
141 Err(e) => {
142 return (
143 StatusCode::BAD_REQUEST,
144 format!("failed to parse override filter: {}", e),
145 )
146 }
147 };
148
149 let _ = state.controller.override_for(duration, new_filter).await;
151
152 (StatusCode::OK, "acknowledged".to_string())
153 }
154
155 async fn reset_handler(State(state): State<LoggingHandlerState>) {
156 let _ = state.controller.reset().await;
158 }
159}
160
161impl APIHandler for LoggingAPIHandler {
162 type State = LoggingHandlerState;
163
164 fn generate_initial_state(&self) -> Self::State {
165 self.state.clone()
166 }
167
168 fn generate_routes(&self) -> Router<Self::State> {
169 Router::new()
170 .route("/logging/override", post(Self::override_handler))
171 .route("/logging/reset", post(Self::reset_handler))
172 }
173}
174
175pub struct LoggingOverrideWorker {
183 state: Mutex<Option<LoggingOverrideWorkerState>>,
184}
185
186struct LoggingOverrideWorkerState {
187 initial_base: EnvFilter,
188 reload_handle: Handle<EnvFilter, Registry>,
189 rx: mpsc::Receiver<LoggingOverrideAction>,
190}
191
192impl LoggingOverrideWorker {
193 pub(super) fn new(
202 initial_base: EnvFilter, reload_handle: Handle<EnvFilter, Registry>,
203 ) -> (Self, LoggingOverrideController) {
204 let (tx, rx) = mpsc::channel(1);
205 let controller = LoggingOverrideController { tx };
206 let worker = Self {
207 state: Mutex::new(Some(LoggingOverrideWorkerState {
208 initial_base,
209 reload_handle,
210 rx,
211 })),
212 };
213
214 (worker, controller)
215 }
216}
217
218#[async_trait]
219impl Supervisable for LoggingOverrideWorker {
220 fn name(&self) -> &str {
221 "dynamic-logging-override-processor"
222 }
223
224 async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
225 let LoggingOverrideWorkerState {
226 initial_base,
227 reload_handle,
228 rx,
229 } = self
230 .state
231 .lock()
232 .unwrap()
233 .take()
234 .ok_or_else(|| InitializationError::Failed {
235 source: generic_error!("logging override worker has already been initialized"),
236 })?;
237
238 Ok(Box::pin(async move {
239 process_override_actions(initial_base, reload_handle, rx, process_shutdown).await;
240 Ok(())
241 }))
242 }
243}
244
245async fn process_override_actions(
246 mut base_filter: EnvFilter, reload_handle: Handle<EnvFilter, Registry>,
247 mut rx: mpsc::Receiver<LoggingOverrideAction>, mut process_shutdown: ProcessShutdown,
248) {
249 let mut override_active = false;
250 let override_timeout = sleep(Duration::from_secs(3600));
251
252 tokio::pin!(override_timeout);
253
254 let shutdown = process_shutdown.wait_for_shutdown();
255 tokio::pin!(shutdown);
256
257 loop {
258 select! {
259 _ = &mut shutdown => break,
260 maybe_action = rx.recv() => match maybe_action {
261 Some(LoggingOverrideAction::Override { duration, filter }) => {
262 info!(directives = %filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
263
264 match reload_handle.reload(filter) {
265 Ok(()) => {
266 override_active = true;
269 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
270 },
271 Err(e) => error!(error = %e, "Failed to override log filtering directives."),
272 }
273 },
274
275 Some(LoggingOverrideAction::Reset) => {
276 override_timeout.as_mut().reset(tokio::time::Instant::now());
279 },
280
281 Some(LoggingOverrideAction::UpdateBase(new_base)) => {
282 base_filter = new_base;
283
284 if override_active {
285 info!(
288 directives = %base_filter,
289 "Updated base log filtering directives; application deferred until active override expires."
290 );
291 } else {
292 let new_directives = base_filter.to_string();
293 match reload_handle.reload(base_filter.clone()) {
294 Ok(()) => info!(directives = %new_directives, "Updated base log filtering directives."),
295 Err(e) => error!(error = %e, "Failed to update base log filtering directives."),
296 }
297 }
298 },
299
300 None => break,
302 },
303 _ = &mut override_timeout => {
304 if override_active {
309 override_active = false;
310
311 let restore_directives = base_filter.to_string();
312 if let Err(e) = reload_handle.reload(base_filter.clone()) {
313 error!(error = %e, "Failed to reset log filtering directives.");
314 }
315
316 info!(directives = %restore_directives, "Restored base log filtering directives.");
317 }
318
319 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
320 }
321 }
322 }
323
324 if let Err(e) = reload_handle.reload(base_filter) {
326 error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use std::time::Duration;
333
334 use tokio::{sync::mpsc, time::sleep};
335 use tracing_subscriber::{reload, EnvFilter, Registry};
336
337 use super::*;
338
339 fn current_filter(handle: &reload::Handle<EnvFilter, Registry>) -> String {
340 handle
341 .clone_current()
342 .expect("reload layer should be alive")
343 .to_string()
344 }
345
346 async fn wait_for_filter(handle: &reload::Handle<EnvFilter, Registry>, expected: &str) {
347 let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
348
349 loop {
350 let actual = current_filter(handle);
351 if actual == expected {
352 return;
353 }
354
355 assert!(
356 tokio::time::Instant::now() < deadline,
357 "filter did not become `{expected}`; last value was `{actual}`"
358 );
359
360 sleep(Duration::from_millis(10)).await;
361 }
362 }
363
364 fn spawn_processor(
365 initial_base: EnvFilter,
366 ) -> (
367 LoggingOverrideController,
368 reload::Handle<EnvFilter, Registry>,
369 tokio::task::JoinHandle<()>,
370 reload::Layer<EnvFilter, Registry>,
371 ) {
372 let (filter_layer, reload_handle) = reload::Layer::new(initial_base.clone());
373 let (tx, rx) = mpsc::channel(1);
374 let controller = LoggingOverrideController { tx };
375 let processor = tokio::spawn(process_override_actions(
376 initial_base,
377 reload_handle.clone(),
378 rx,
379 ProcessShutdown::noop(),
380 ));
381
382 (controller, reload_handle, processor, filter_layer)
385 }
386
387 #[tokio::test]
388 async fn reset_restores_current_base_filter() {
389 let (controller, reload_handle, processor, _filter_layer) =
390 spawn_processor(EnvFilter::new("agent_data_plane=info"));
391
392 controller
393 .override_for(Duration::from_secs(60), EnvFilter::new("hyper=warn"))
394 .await
395 .expect("send override");
396 wait_for_filter(&reload_handle, "hyper=warn").await;
397
398 controller
399 .update_base(EnvFilter::new("agent_data_plane=debug"))
400 .await
401 .expect("send update_base");
402 controller.reset().await.expect("send reset");
403 wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
404
405 drop(controller);
406 processor.await.expect("override processor should exit cleanly");
407 }
408
409 #[tokio::test]
410 async fn override_expiration_restores_current_base_filter() {
411 let (controller, reload_handle, processor, _filter_layer) =
412 spawn_processor(EnvFilter::new("agent_data_plane=info"));
413
414 controller
415 .override_for(Duration::from_millis(100), EnvFilter::new("hyper=warn"))
416 .await
417 .expect("send override");
418 wait_for_filter(&reload_handle, "hyper=warn").await;
419
420 controller
421 .update_base(EnvFilter::new("agent_data_plane=warn"))
422 .await
423 .expect("send update_base");
424 wait_for_filter(&reload_handle, "agent_data_plane=warn").await;
425
426 drop(controller);
427 processor.await.expect("override processor should exit cleanly");
428 }
429
430 #[tokio::test]
431 async fn update_base_applies_immediately_when_no_override_active() {
432 let (controller, reload_handle, processor, _filter_layer) =
433 spawn_processor(EnvFilter::new("agent_data_plane=info"));
434
435 controller
436 .update_base(EnvFilter::new("agent_data_plane=debug"))
437 .await
438 .expect("send update_base");
439 wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
440
441 drop(controller);
442 processor.await.expect("override processor should exit cleanly");
443 }
444}