1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use saluki_api::{
5 extract::{Query, State},
6 response::IntoResponse,
7 routing::{post, Router},
8 APIHandler, DynamicRoute, EndpointType, StatusCode,
9};
10use saluki_core::runtime::{
11 state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
12};
13use saluki_error::{generic_error, GenericError};
14use serde::Deserialize;
15use tokio::{
16 select,
17 sync::{mpsc, Mutex},
18 time::sleep,
19};
20use tracing::{error, info};
21use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
22
23#[derive(Deserialize)]
24struct OverrideQueryParams {
25 time_secs: u64,
26}
27
28enum LoggingOverrideAction {
30 Override { duration: Duration, filter: EnvFilter },
35
36 Reset,
38
39 UpdateBase(EnvFilter),
44}
45
46#[derive(Clone)]
52pub struct LoggingOverrideController {
53 tx: mpsc::Sender<LoggingOverrideAction>,
54}
55
56impl LoggingOverrideController {
57 pub(crate) async fn override_for(&self, duration: Duration, filter: EnvFilter) -> Result<(), GenericError> {
59 self.send(LoggingOverrideAction::Override { duration, filter }).await
60 }
61
62 pub(crate) async fn reset(&self) -> Result<(), GenericError> {
64 self.send(LoggingOverrideAction::Reset).await
65 }
66
67 pub async fn update_base(&self, filter: EnvFilter) -> Result<(), GenericError> {
71 self.send(LoggingOverrideAction::UpdateBase(filter)).await
72 }
73
74 async fn send(&self, action: LoggingOverrideAction) -> Result<(), GenericError> {
75 self.tx
76 .send(action)
77 .await
78 .map_err(|_| generic_error!("logging override worker is no longer running"))
79 }
80}
81
82#[derive(Clone)]
84pub struct LoggingHandlerState {
85 controller: LoggingOverrideController,
86}
87
88pub struct LoggingAPIHandler {
100 state: LoggingHandlerState,
101}
102
103impl LoggingAPIHandler {
104 fn new(controller: LoggingOverrideController) -> Self {
106 Self {
107 state: LoggingHandlerState { controller },
108 }
109 }
110
111 async fn override_handler(
112 State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
113 ) -> impl IntoResponse {
114 const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
116 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
117 return (
118 StatusCode::BAD_REQUEST,
119 format!(
120 "override time cannot be greater than {} seconds",
121 MAXIMUM_OVERRIDE_LENGTH_SECS
122 ),
123 );
124 }
125
126 let duration = Duration::from_secs(params.time_secs);
128 let new_filter = match EnvFilter::try_new(body) {
129 Ok(filter) => filter,
130 Err(e) => {
131 return (
132 StatusCode::BAD_REQUEST,
133 format!("failed to parse override filter: {}", e),
134 )
135 }
136 };
137
138 let _ = state.controller.override_for(duration, new_filter).await;
140
141 (StatusCode::OK, "acknowledged".to_string())
142 }
143
144 async fn reset_handler(State(state): State<LoggingHandlerState>) {
145 let _ = state.controller.reset().await;
147 }
148}
149
150impl APIHandler for LoggingAPIHandler {
151 type State = LoggingHandlerState;
152
153 fn generate_initial_state(&self) -> Self::State {
154 self.state.clone()
155 }
156
157 fn generate_routes(&self) -> Router<Self::State> {
158 Router::new()
159 .route("/logging/override", post(Self::override_handler))
160 .route("/logging/reset", post(Self::reset_handler))
161 }
162}
163
164pub struct LoggingOverrideWorker {
169 handler: LoggingAPIHandler,
170 state: Arc<Mutex<LoggingOverrideWorkerState>>,
171}
172
173struct LoggingOverrideWorkerState {
174 reload_handle: Handle<EnvFilter, Registry>,
175 rx: mpsc::Receiver<LoggingOverrideAction>,
176}
177
178impl LoggingOverrideWorker {
179 pub(super) fn new(reload_handle: Handle<EnvFilter, Registry>) -> (Self, LoggingOverrideController) {
185 let (tx, rx) = mpsc::channel(1);
186 let controller = LoggingOverrideController { tx };
187 let handler = LoggingAPIHandler::new(controller.clone());
188 let worker = Self {
189 handler,
190 state: Arc::new(Mutex::new(LoggingOverrideWorkerState { reload_handle, rx })),
191 };
192
193 (worker, controller)
194 }
195}
196
197#[async_trait]
198impl Supervisable for LoggingOverrideWorker {
199 fn name(&self) -> &str {
200 "dynamic-logging-override-processor"
201 }
202
203 async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
204 let mut state = self.state.clone().lock_owned().await;
205 let logging_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
206
207 Ok(Box::pin(async move {
208 DataspaceRegistry::try_current()
209 .ok_or_else(|| generic_error!("Dataspace not available."))?
210 .assert(logging_route, "logging-api");
211
212 process_override_actions(&mut state, process_shutdown).await;
213 Ok(())
214 }))
215 }
216}
217
218async fn process_override_actions(state: &mut LoggingOverrideWorkerState, mut process_shutdown: ProcessShutdown) {
219 let mut base_filter = match state.reload_handle.clone_current() {
223 Some(filter) => filter,
224 None => {
225 error!("Logging subsystem is in an indeterminate state; dynamic log filtering will not be available.");
226
227 process_shutdown.wait_for_shutdown().await;
228 return;
229 }
230 };
231
232 let mut override_active = false;
233 let override_timeout = sleep(Duration::from_secs(3600));
234
235 tokio::pin!(override_timeout);
236
237 let shutdown = process_shutdown.wait_for_shutdown();
238 tokio::pin!(shutdown);
239
240 loop {
241 select! {
242 _ = &mut shutdown => break,
243 maybe_action = state.rx.recv() => match maybe_action {
244 Some(LoggingOverrideAction::Override { duration, filter }) => {
245 info!(directives = %filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
246
247 match state.reload_handle.reload(filter) {
248 Ok(()) => {
249 override_active = true;
252 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
253 },
254 Err(e) => error!(error = %e, "Failed to override log filtering directives."),
255 }
256 },
257
258 Some(LoggingOverrideAction::Reset) => {
259 override_timeout.as_mut().reset(tokio::time::Instant::now());
262 },
263
264 Some(LoggingOverrideAction::UpdateBase(new_base)) => {
265 let existing_base_filter_str = base_filter.to_string();
270 let new_base_filter_str = new_base.to_string();
271 if new_base_filter_str == existing_base_filter_str {
272 continue;
273 }
274
275 base_filter = new_base;
276
277 if override_active {
278 info!(
281 directives = %base_filter,
282 "Updated base log filtering directives; application deferred until active override expires."
283 );
284 } else {
285 let new_directives = base_filter.to_string();
286 match state.reload_handle.reload(base_filter.clone()) {
287 Ok(()) => info!(directives = %new_directives, "Updated base log filtering directives."),
288 Err(e) => error!(error = %e, "Failed to update base log filtering directives."),
289 }
290 }
291 },
292
293 None => break,
295 },
296 _ = &mut override_timeout => {
297 if override_active {
302 override_active = false;
303
304 let restore_directives = base_filter.to_string();
305 if let Err(e) = state.reload_handle.reload(base_filter.clone()) {
306 error!(error = %e, "Failed to reset log filtering directives.");
307 }
308
309 info!(directives = %restore_directives, "Restored base log filtering directives.");
310 }
311
312 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
313 }
314 }
315 }
316
317 if let Err(e) = state.reload_handle.reload(base_filter) {
319 error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use std::time::Duration;
326
327 use tokio::{sync::mpsc, time::sleep};
328 use tracing_subscriber::{reload, EnvFilter, Registry};
329
330 use super::*;
331
332 fn current_filter(handle: &reload::Handle<EnvFilter, Registry>) -> String {
333 handle
334 .clone_current()
335 .expect("reload layer should be alive")
336 .to_string()
337 }
338
339 async fn wait_for_filter(handle: &reload::Handle<EnvFilter, Registry>, expected: &str) {
340 let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
341
342 loop {
343 let actual = current_filter(handle);
344 if actual == expected {
345 return;
346 }
347
348 assert!(
349 tokio::time::Instant::now() < deadline,
350 "filter did not become `{expected}`; last value was `{actual}`"
351 );
352
353 sleep(Duration::from_millis(10)).await;
354 }
355 }
356
357 fn spawn_processor(
358 base_filter: EnvFilter,
359 ) -> (
360 LoggingOverrideController,
361 reload::Handle<EnvFilter, Registry>,
362 tokio::task::JoinHandle<()>,
363 reload::Layer<EnvFilter, Registry>,
364 ) {
365 let (filter_layer, reload_handle) = reload::Layer::new(base_filter);
366 let (tx, rx) = mpsc::channel(1);
367 let controller = LoggingOverrideController { tx };
368 let mut state = LoggingOverrideWorkerState {
369 reload_handle: reload_handle.clone(),
370 rx,
371 };
372 let processor = tokio::spawn(async move {
373 process_override_actions(&mut state, ProcessShutdown::noop()).await;
374 });
375
376 (controller, reload_handle, processor, filter_layer)
379 }
380
381 #[tokio::test]
382 async fn reset_restores_current_base_filter() {
383 let (controller, reload_handle, processor, _filter_layer) =
384 spawn_processor(EnvFilter::new("agent_data_plane=info"));
385
386 controller
387 .override_for(Duration::from_secs(60), EnvFilter::new("hyper=warn"))
388 .await
389 .expect("send override");
390 wait_for_filter(&reload_handle, "hyper=warn").await;
391
392 controller
393 .update_base(EnvFilter::new("agent_data_plane=debug"))
394 .await
395 .expect("send update_base");
396 controller.reset().await.expect("send reset");
397 wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
398
399 drop(controller);
400 processor.await.expect("override processor should exit cleanly");
401 }
402
403 #[tokio::test]
404 async fn override_expiration_restores_current_base_filter() {
405 let (controller, reload_handle, processor, _filter_layer) =
406 spawn_processor(EnvFilter::new("agent_data_plane=info"));
407
408 controller
409 .override_for(Duration::from_millis(100), EnvFilter::new("hyper=warn"))
410 .await
411 .expect("send override");
412 wait_for_filter(&reload_handle, "hyper=warn").await;
413
414 controller
415 .update_base(EnvFilter::new("agent_data_plane=warn"))
416 .await
417 .expect("send update_base");
418 wait_for_filter(&reload_handle, "agent_data_plane=warn").await;
419
420 drop(controller);
421 processor.await.expect("override processor should exit cleanly");
422 }
423
424 #[tokio::test]
425 async fn update_base_applies_immediately_when_no_override_active() {
426 let (controller, reload_handle, processor, _filter_layer) =
427 spawn_processor(EnvFilter::new("agent_data_plane=info"));
428
429 controller
430 .update_base(EnvFilter::new("agent_data_plane=debug"))
431 .await
432 .expect("send update_base");
433 wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
434
435 drop(controller);
436 processor.await.expect("override processor should exit cleanly");
437 }
438}