Skip to main content

saluki_app/logging/
api.rs

1use std::{sync::Mutex, time::Duration};
2
3use async_trait::async_trait;
4use saluki_api::{
5    extract::{Query, State},
6    response::IntoResponse,
7    routing::{post, Router},
8    APIHandler, StatusCode,
9};
10use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture};
11use saluki_error::{generic_error, GenericError};
12use serde::Deserialize;
13use tokio::{select, sync::mpsc, time::sleep};
14use tracing::{error, info};
15use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
16
17static API_HANDLER: Mutex<Option<LoggingAPIHandler>> = Mutex::new(None);
18
19pub(super) fn set_logging_api_handler(handler: LoggingAPIHandler) {
20    API_HANDLER.lock().unwrap().replace(handler);
21}
22
23/// Acquires the logging API handler.
24///
25/// This function is mutable, and consumes the handler if it's present. This means it should only be called once, and
26/// only after logging has been initialized via `initialize_dynamic_logging`.
27///
28/// The logging API handler can be used to install API routes which allow dynamically controlling the logging level
29/// filtering. See [`LoggingAPIHandler`] for more information.
30pub fn acquire_logging_api_handler() -> Option<LoggingAPIHandler> {
31    API_HANDLER.lock().unwrap().take()
32}
33
34#[derive(Deserialize)]
35struct OverrideQueryParams {
36    time_secs: u64,
37}
38
39/// An action driven through a [`LoggingOverrideController`] and processed by [`LoggingOverrideWorker`].
40enum LoggingOverrideAction {
41    /// Apply `filter` as a temporary override on top of the current base, lasting `duration`.
42    ///
43    /// When the duration elapses, or a [`Reset`][LoggingOverrideAction::Reset] arrives, the worker restores the
44    /// current base filter.
45    Override { duration: Duration, filter: EnvFilter },
46
47    /// Clear any active override, immediately restoring the current base filter.
48    Reset,
49
50    /// Replace the current base filter.
51    ///
52    /// Applied immediately if no override is active. If an override is active, the new base is stored and applied
53    /// once the override expires (or is reset), so we don't clobber an in-flight override.
54    UpdateBase(EnvFilter),
55}
56
57/// Controls the dynamic logging filter at runtime.
58///
59/// All filter mutations — temporary overrides, resets, and base updates — flow through here to the
60/// [`LoggingOverrideWorker`] that owns the underlying `tracing` reload handle. Cloning is cheap; hand clones to
61/// any caller that needs to drive filter changes (HTTP handlers, configuration watchers, etc.).
62#[derive(Clone)]
63pub struct LoggingOverrideController {
64    tx: mpsc::Sender<LoggingOverrideAction>,
65}
66
67impl LoggingOverrideController {
68    /// Applies `filter` as a temporary override for `duration`, after which the base filter is restored.
69    pub(crate) async fn override_for(&self, duration: Duration, filter: EnvFilter) -> Result<(), GenericError> {
70        self.send(LoggingOverrideAction::Override { duration, filter }).await
71    }
72
73    /// Clears any active override, immediately restoring the current base filter.
74    pub(crate) async fn reset(&self) -> Result<(), GenericError> {
75        self.send(LoggingOverrideAction::Reset).await
76    }
77
78    /// Replaces the current base filter.
79    ///
80    /// Applied immediately if no override is active; otherwise stored and applied when the override expires.
81    pub async fn update_base(&self, filter: EnvFilter) -> Result<(), GenericError> {
82        self.send(LoggingOverrideAction::UpdateBase(filter)).await
83    }
84
85    async fn send(&self, action: LoggingOverrideAction) -> Result<(), GenericError> {
86        self.tx
87            .send(action)
88            .await
89            .map_err(|_| generic_error!("logging override worker is no longer running"))
90    }
91}
92
93/// State used for the logging API handler.
94#[derive(Clone)]
95pub struct LoggingHandlerState {
96    controller: LoggingOverrideController,
97}
98
99/// An API handler for updating log filtering directives at runtime.
100///
101/// This handler exposes two main routes -- `/logging/override` and `/logging/reset` -- which allow for overriding the
102/// default log filtering directives (configured at startup) at runtime, and then resetting them once the override is no
103/// longer needed.
104///
105/// As this has the potential for incredibly verbose logging at runtime, the override is set with a specific duration in
106/// which it will apply. Once an override has been active for the configured duration, it will automatically be reset
107/// unless the override is refreshed before the duration elapses.
108///
109/// The maximum duration for an override is 10 minutes.
110pub struct LoggingAPIHandler {
111    state: LoggingHandlerState,
112}
113
114impl LoggingAPIHandler {
115    /// Creates a new `LoggingAPIHandler` driven by the given controller.
116    pub(super) fn new(controller: LoggingOverrideController) -> Self {
117        Self {
118            state: LoggingHandlerState { controller },
119        }
120    }
121
122    async fn override_handler(
123        State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
124    ) -> impl IntoResponse {
125        // Make sure the override length is within the acceptable range.
126        const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
127        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
128            return (
129                StatusCode::BAD_REQUEST,
130                format!(
131                    "override time cannot be greater than {} seconds",
132                    MAXIMUM_OVERRIDE_LENGTH_SECS
133                ),
134            );
135        }
136
137        // Parse the override duration and create a new filter from the body.
138        let duration = Duration::from_secs(params.time_secs);
139        let new_filter = match EnvFilter::try_new(body) {
140            Ok(filter) => filter,
141            Err(e) => {
142                return (
143                    StatusCode::BAD_REQUEST,
144                    format!("failed to parse override filter: {}", e),
145                )
146            }
147        };
148
149        // Instruct the override worker to apply the new log filtering directives for the given duration.
150        let _ = state.controller.override_for(duration, new_filter).await;
151
152        (StatusCode::OK, "acknowledged".to_string())
153    }
154
155    async fn reset_handler(State(state): State<LoggingHandlerState>) {
156        // Instruct the override worker to immediately reset back to the current base log filtering directives.
157        let _ = state.controller.reset().await;
158    }
159}
160
161impl APIHandler for LoggingAPIHandler {
162    type State = LoggingHandlerState;
163
164    fn generate_initial_state(&self) -> Self::State {
165        self.state.clone()
166    }
167
168    fn generate_routes(&self) -> Router<Self::State> {
169        Router::new()
170            .route("/logging/override", post(Self::override_handler))
171            .route("/logging/reset", post(Self::reset_handler))
172    }
173}
174
175/// A worker that processes log filter directive overrides.
176///
177/// Owns the receiving half of the controller's channel and the canonical base filter. The worker exits cleanly
178/// on either supervisor shutdown or channel close.
179///
180/// One-shot: a successful initialization consumes the worker state. Restart by the supervisor will fail with an
181/// initialization error, propagating up to bring the supervisor down.
182pub struct LoggingOverrideWorker {
183    state: Mutex<Option<LoggingOverrideWorkerState>>,
184}
185
186struct LoggingOverrideWorkerState {
187    initial_base: EnvFilter,
188    reload_handle: Handle<EnvFilter, Registry>,
189    rx: mpsc::Receiver<LoggingOverrideAction>,
190}
191
192impl LoggingOverrideWorker {
193    /// Creates a new worker paired with a [`LoggingOverrideController`].
194    ///
195    /// `initial_base` is the base filter to restore after an override expires; it can be replaced at runtime via
196    /// [`LoggingOverrideController::update_base`]. `reload_handle` is the `tracing_subscriber` reload handle that
197    /// the worker drives directly.
198    ///
199    /// The worker must be added to a [`Supervisor`][saluki_core::runtime::Supervisor] for the controller's actions
200    /// to take effect; without it, sends are accepted but never applied.
201    pub(super) fn new(
202        initial_base: EnvFilter, reload_handle: Handle<EnvFilter, Registry>,
203    ) -> (Self, LoggingOverrideController) {
204        let (tx, rx) = mpsc::channel(1);
205        let controller = LoggingOverrideController { tx };
206        let worker = Self {
207            state: Mutex::new(Some(LoggingOverrideWorkerState {
208                initial_base,
209                reload_handle,
210                rx,
211            })),
212        };
213
214        (worker, controller)
215    }
216}
217
218#[async_trait]
219impl Supervisable for LoggingOverrideWorker {
220    fn name(&self) -> &str {
221        "dynamic-logging-override-processor"
222    }
223
224    async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
225        let LoggingOverrideWorkerState {
226            initial_base,
227            reload_handle,
228            rx,
229        } = self
230            .state
231            .lock()
232            .unwrap()
233            .take()
234            .ok_or_else(|| InitializationError::Failed {
235                source: generic_error!("logging override worker has already been initialized"),
236            })?;
237
238        Ok(Box::pin(async move {
239            process_override_actions(initial_base, reload_handle, rx, process_shutdown).await;
240            Ok(())
241        }))
242    }
243}
244
245async fn process_override_actions(
246    mut base_filter: EnvFilter, reload_handle: Handle<EnvFilter, Registry>,
247    mut rx: mpsc::Receiver<LoggingOverrideAction>, mut process_shutdown: ProcessShutdown,
248) {
249    let mut override_active = false;
250    let override_timeout = sleep(Duration::from_secs(3600));
251
252    tokio::pin!(override_timeout);
253
254    let shutdown = process_shutdown.wait_for_shutdown();
255    tokio::pin!(shutdown);
256
257    loop {
258        select! {
259            _ = &mut shutdown => break,
260            maybe_action = rx.recv() => match maybe_action {
261                Some(LoggingOverrideAction::Override { duration, filter }) => {
262                    info!(directives = %filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
263
264                    match reload_handle.reload(filter) {
265                        Ok(()) => {
266                            // We were able to successfully reload the filter, so mark ourselves as having an active
267                            // override and update the override timeout.
268                            override_active = true;
269                            override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
270                        },
271                        Err(e) => error!(error = %e, "Failed to override log filtering directives."),
272                    }
273                },
274
275                Some(LoggingOverrideAction::Reset) => {
276                    // We've been instructed to immediately reset the filter back to the base one, so simply update
277                    // the override timeout to fire as soon as possible.
278                    override_timeout.as_mut().reset(tokio::time::Instant::now());
279                },
280
281                Some(LoggingOverrideAction::UpdateBase(new_base)) => {
282                    base_filter = new_base;
283
284                    if override_active {
285                        // An override is currently active. Don't clobber it -- the new base will take effect when
286                        // the override expires.
287                        info!(
288                            directives = %base_filter,
289                            "Updated base log filtering directives; application deferred until active override expires."
290                        );
291                    } else {
292                        let new_directives = base_filter.to_string();
293                        match reload_handle.reload(base_filter.clone()) {
294                            Ok(()) => info!(directives = %new_directives, "Updated base log filtering directives."),
295                            Err(e) => error!(error = %e, "Failed to update base log filtering directives."),
296                        }
297                    }
298                },
299
300                // Our sender has dropped, so there's no more actions for us to handle.
301                None => break,
302            },
303            _ = &mut override_timeout => {
304                // Our override timeout has fired. If we have an active override, reset it back to the base filter.
305                //
306                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
307                // reset the timeout with a long duration.
308                if override_active {
309                    override_active = false;
310
311                    let restore_directives = base_filter.to_string();
312                    if let Err(e) = reload_handle.reload(base_filter.clone()) {
313                        error!(error = %e, "Failed to reset log filtering directives.");
314                    }
315
316                    info!(directives = %restore_directives, "Restored base log filtering directives.");
317                }
318
319                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
320            }
321        }
322    }
323
324    // Reset our filter to the base one before we exit.
325    if let Err(e) = reload_handle.reload(base_filter) {
326        error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use std::time::Duration;
333
334    use tokio::{sync::mpsc, time::sleep};
335    use tracing_subscriber::{reload, EnvFilter, Registry};
336
337    use super::*;
338
339    fn current_filter(handle: &reload::Handle<EnvFilter, Registry>) -> String {
340        handle
341            .clone_current()
342            .expect("reload layer should be alive")
343            .to_string()
344    }
345
346    async fn wait_for_filter(handle: &reload::Handle<EnvFilter, Registry>, expected: &str) {
347        let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
348
349        loop {
350            let actual = current_filter(handle);
351            if actual == expected {
352                return;
353            }
354
355            assert!(
356                tokio::time::Instant::now() < deadline,
357                "filter did not become `{expected}`; last value was `{actual}`"
358            );
359
360            sleep(Duration::from_millis(10)).await;
361        }
362    }
363
364    fn spawn_processor(
365        initial_base: EnvFilter,
366    ) -> (
367        LoggingOverrideController,
368        reload::Handle<EnvFilter, Registry>,
369        tokio::task::JoinHandle<()>,
370        reload::Layer<EnvFilter, Registry>,
371    ) {
372        let (filter_layer, reload_handle) = reload::Layer::new(initial_base.clone());
373        let (tx, rx) = mpsc::channel(1);
374        let controller = LoggingOverrideController { tx };
375        let processor = tokio::spawn(process_override_actions(
376            initial_base,
377            reload_handle.clone(),
378            rx,
379            ProcessShutdown::noop(),
380        ));
381
382        // The reload layer must outlive the handles -- callers should bind it (e.g., `let _layer = ...`) to keep
383        // the handle alive for the duration of the test.
384        (controller, reload_handle, processor, filter_layer)
385    }
386
387    #[tokio::test]
388    async fn reset_restores_current_base_filter() {
389        let (controller, reload_handle, processor, _filter_layer) =
390            spawn_processor(EnvFilter::new("agent_data_plane=info"));
391
392        controller
393            .override_for(Duration::from_secs(60), EnvFilter::new("hyper=warn"))
394            .await
395            .expect("send override");
396        wait_for_filter(&reload_handle, "hyper=warn").await;
397
398        controller
399            .update_base(EnvFilter::new("agent_data_plane=debug"))
400            .await
401            .expect("send update_base");
402        controller.reset().await.expect("send reset");
403        wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
404
405        drop(controller);
406        processor.await.expect("override processor should exit cleanly");
407    }
408
409    #[tokio::test]
410    async fn override_expiration_restores_current_base_filter() {
411        let (controller, reload_handle, processor, _filter_layer) =
412            spawn_processor(EnvFilter::new("agent_data_plane=info"));
413
414        controller
415            .override_for(Duration::from_millis(100), EnvFilter::new("hyper=warn"))
416            .await
417            .expect("send override");
418        wait_for_filter(&reload_handle, "hyper=warn").await;
419
420        controller
421            .update_base(EnvFilter::new("agent_data_plane=warn"))
422            .await
423            .expect("send update_base");
424        wait_for_filter(&reload_handle, "agent_data_plane=warn").await;
425
426        drop(controller);
427        processor.await.expect("override processor should exit cleanly");
428    }
429
430    #[tokio::test]
431    async fn update_base_applies_immediately_when_no_override_active() {
432        let (controller, reload_handle, processor, _filter_layer) =
433            spawn_processor(EnvFilter::new("agent_data_plane=info"));
434
435        controller
436            .update_base(EnvFilter::new("agent_data_plane=debug"))
437            .await
438            .expect("send update_base");
439        wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
440
441        drop(controller);
442        processor.await.expect("override processor should exit cleanly");
443    }
444}