1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use saluki_api::{
5 extract::{Query, State},
6 response::IntoResponse,
7 routing::{post, Router},
8 APIHandler, DynamicRoute, EndpointType, StatusCode,
9};
10use saluki_common::sync::shutdown::ShutdownHandle;
11use saluki_core::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture};
12use saluki_error::{generic_error, GenericError};
13use serde::Deserialize;
14use tokio::{
15 pin, select,
16 sync::{mpsc, Mutex},
17 time::sleep,
18};
19use tracing::{error, info};
20use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
21
22#[derive(Deserialize)]
23struct OverrideQueryParams {
24 time_secs: u64,
25}
26
27enum LoggingOverrideAction {
29 Override { duration: Duration, filter: EnvFilter },
34
35 Reset,
37
38 UpdateBase(EnvFilter),
43}
44
45#[derive(Clone)]
51pub struct LoggingOverrideController {
52 tx: mpsc::Sender<LoggingOverrideAction>,
53}
54
55impl LoggingOverrideController {
56 pub(crate) async fn override_for(&self, duration: Duration, filter: EnvFilter) -> Result<(), GenericError> {
58 self.send(LoggingOverrideAction::Override { duration, filter }).await
59 }
60
61 pub(crate) async fn reset(&self) -> Result<(), GenericError> {
63 self.send(LoggingOverrideAction::Reset).await
64 }
65
66 pub async fn update_base(&self, filter: EnvFilter) -> Result<(), GenericError> {
70 self.send(LoggingOverrideAction::UpdateBase(filter)).await
71 }
72
73 async fn send(&self, action: LoggingOverrideAction) -> Result<(), GenericError> {
74 self.tx
75 .send(action)
76 .await
77 .map_err(|_| generic_error!("logging override worker is no longer running"))
78 }
79}
80
81#[derive(Clone)]
83pub struct LoggingHandlerState {
84 controller: LoggingOverrideController,
85}
86
87pub struct LoggingAPIHandler {
99 state: LoggingHandlerState,
100}
101
102impl LoggingAPIHandler {
103 fn new(controller: LoggingOverrideController) -> Self {
105 Self {
106 state: LoggingHandlerState { controller },
107 }
108 }
109
110 async fn override_handler(
111 State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
112 ) -> impl IntoResponse {
113 const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
115 if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
116 return (
117 StatusCode::BAD_REQUEST,
118 format!(
119 "override time cannot be greater than {} seconds",
120 MAXIMUM_OVERRIDE_LENGTH_SECS
121 ),
122 );
123 }
124
125 let duration = Duration::from_secs(params.time_secs);
127 let new_filter = match EnvFilter::try_new(body) {
128 Ok(filter) => filter,
129 Err(e) => {
130 return (
131 StatusCode::BAD_REQUEST,
132 format!("failed to parse override filter: {}", e),
133 )
134 }
135 };
136
137 let _ = state.controller.override_for(duration, new_filter).await;
139
140 (StatusCode::OK, "acknowledged".to_string())
141 }
142
143 async fn reset_handler(State(state): State<LoggingHandlerState>) {
144 let _ = state.controller.reset().await;
146 }
147}
148
149impl APIHandler for LoggingAPIHandler {
150 type State = LoggingHandlerState;
151
152 fn generate_initial_state(&self) -> Self::State {
153 self.state.clone()
154 }
155
156 fn generate_routes(&self) -> Router<Self::State> {
157 Router::new()
158 .route("/logging/override", post(Self::override_handler))
159 .route("/logging/reset", post(Self::reset_handler))
160 }
161}
162
163pub struct LoggingOverrideWorker {
168 handler: LoggingAPIHandler,
169 state: Arc<Mutex<LoggingOverrideWorkerState>>,
170}
171
172struct LoggingOverrideWorkerState {
173 reload_handle: Handle<EnvFilter, Registry>,
174 rx: mpsc::Receiver<LoggingOverrideAction>,
175}
176
177impl LoggingOverrideWorker {
178 pub(super) fn new(reload_handle: Handle<EnvFilter, Registry>) -> (Self, LoggingOverrideController) {
184 let (tx, rx) = mpsc::channel(1);
185 let controller = LoggingOverrideController { tx };
186 let handler = LoggingAPIHandler::new(controller.clone());
187 let worker = Self {
188 handler,
189 state: Arc::new(Mutex::new(LoggingOverrideWorkerState { reload_handle, rx })),
190 };
191
192 (worker, controller)
193 }
194}
195
196#[async_trait]
197impl Supervisable for LoggingOverrideWorker {
198 fn name(&self) -> &str {
199 "dynamic-logging-override-processor"
200 }
201
202 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
203 let mut state = self.state.clone().lock_owned().await;
204 let logging_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
205
206 Ok(Box::pin(async move {
207 DataspaceRegistry::try_current()
208 .ok_or_else(|| generic_error!("Dataspace not available."))?
209 .assert(logging_route, "logging-api");
210
211 process_override_actions(&mut state, process_shutdown).await;
212 Ok(())
213 }))
214 }
215}
216
217async fn process_override_actions(state: &mut LoggingOverrideWorkerState, process_shutdown: ShutdownHandle) {
218 let mut base_filter = match state.reload_handle.clone_current() {
222 Some(filter) => filter,
223 None => {
224 error!("Logging subsystem is in an indeterminate state; dynamic log filtering will not be available.");
225
226 process_shutdown.await;
227 return;
228 }
229 };
230
231 let mut override_active = false;
232 let override_timeout = sleep(Duration::from_secs(3600));
233
234 pin!(override_timeout, process_shutdown);
235
236 loop {
237 select! {
238 _ = &mut process_shutdown => break,
239 maybe_action = state.rx.recv() => match maybe_action {
240 Some(LoggingOverrideAction::Override { duration, filter }) => {
241 info!(directives = %filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
242
243 match state.reload_handle.reload(filter) {
244 Ok(()) => {
245 override_active = true;
248 override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
249 },
250 Err(e) => error!(error = %e, "Failed to override log filtering directives."),
251 }
252 },
253
254 Some(LoggingOverrideAction::Reset) => {
255 override_timeout.as_mut().reset(tokio::time::Instant::now());
258 },
259
260 Some(LoggingOverrideAction::UpdateBase(new_base)) => {
261 let existing_base_filter_str = base_filter.to_string();
266 let new_base_filter_str = new_base.to_string();
267 if new_base_filter_str == existing_base_filter_str {
268 continue;
269 }
270
271 base_filter = new_base;
272
273 if override_active {
274 info!(
277 directives = %base_filter,
278 "Updated base log filtering directives; application deferred until active override expires."
279 );
280 } else {
281 let new_directives = base_filter.to_string();
282 match state.reload_handle.reload(base_filter.clone()) {
283 Ok(()) => info!(directives = %new_directives, "Updated base log filtering directives."),
284 Err(e) => error!(error = %e, "Failed to update base log filtering directives."),
285 }
286 }
287 },
288
289 None => break,
291 },
292 _ = &mut override_timeout => {
293 if override_active {
298 override_active = false;
299
300 let restore_directives = base_filter.to_string();
301 if let Err(e) = state.reload_handle.reload(base_filter.clone()) {
302 error!(error = %e, "Failed to reset log filtering directives.");
303 }
304
305 info!(directives = %restore_directives, "Restored base log filtering directives.");
306 }
307
308 override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
309 }
310 }
311 }
312
313 if let Err(e) = state.reload_handle.reload(base_filter) {
315 error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use std::time::Duration;
322
323 use tokio::{sync::mpsc, time::sleep};
324 use tracing_subscriber::{reload, EnvFilter, Registry};
325
326 use super::*;
327
328 fn current_filter(handle: &reload::Handle<EnvFilter, Registry>) -> String {
329 handle
330 .clone_current()
331 .expect("reload layer should be alive")
332 .to_string()
333 }
334
335 async fn wait_for_filter(handle: &reload::Handle<EnvFilter, Registry>, expected: &str) {
336 let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
337
338 loop {
339 let actual = current_filter(handle);
340 if actual == expected {
341 return;
342 }
343
344 assert!(
345 tokio::time::Instant::now() < deadline,
346 "filter did not become `{expected}`; last value was `{actual}`"
347 );
348
349 sleep(Duration::from_millis(10)).await;
350 }
351 }
352
353 fn spawn_processor(
354 base_filter: EnvFilter,
355 ) -> (
356 LoggingOverrideController,
357 reload::Handle<EnvFilter, Registry>,
358 tokio::task::JoinHandle<()>,
359 reload::Layer<EnvFilter, Registry>,
360 ) {
361 let (filter_layer, reload_handle) = reload::Layer::new(base_filter);
362 let (tx, rx) = mpsc::channel(1);
363 let controller = LoggingOverrideController { tx };
364 let mut state = LoggingOverrideWorkerState {
365 reload_handle: reload_handle.clone(),
366 rx,
367 };
368 let processor = tokio::spawn(async move {
369 process_override_actions(&mut state, ShutdownHandle::noop()).await;
370 });
371
372 (controller, reload_handle, processor, filter_layer)
375 }
376
377 #[tokio::test]
378 async fn reset_restores_current_base_filter() {
379 let (controller, reload_handle, processor, _filter_layer) =
380 spawn_processor(EnvFilter::new("agent_data_plane=info"));
381
382 controller
383 .override_for(Duration::from_secs(60), EnvFilter::new("hyper=warn"))
384 .await
385 .expect("send override");
386 wait_for_filter(&reload_handle, "hyper=warn").await;
387
388 controller
389 .update_base(EnvFilter::new("agent_data_plane=debug"))
390 .await
391 .expect("send update_base");
392 controller.reset().await.expect("send reset");
393 wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
394
395 drop(controller);
396 processor.await.expect("override processor should exit cleanly");
397 }
398
399 #[tokio::test]
400 async fn override_expiration_restores_current_base_filter() {
401 let (controller, reload_handle, processor, _filter_layer) =
402 spawn_processor(EnvFilter::new("agent_data_plane=info"));
403
404 controller
405 .override_for(Duration::from_millis(100), EnvFilter::new("hyper=warn"))
406 .await
407 .expect("send override");
408 wait_for_filter(&reload_handle, "hyper=warn").await;
409
410 controller
411 .update_base(EnvFilter::new("agent_data_plane=warn"))
412 .await
413 .expect("send update_base");
414 wait_for_filter(&reload_handle, "agent_data_plane=warn").await;
415
416 drop(controller);
417 processor.await.expect("override processor should exit cleanly");
418 }
419
420 #[tokio::test]
421 async fn update_base_applies_immediately_when_no_override_active() {
422 let (controller, reload_handle, processor, _filter_layer) =
423 spawn_processor(EnvFilter::new("agent_data_plane=info"));
424
425 controller
426 .update_base(EnvFilter::new("agent_data_plane=debug"))
427 .await
428 .expect("send update_base");
429 wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
430
431 drop(controller);
432 processor.await.expect("override processor should exit cleanly");
433 }
434}