Skip to main content

saluki_app/logging/
api.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use saluki_api::{
5    extract::{Query, State},
6    response::IntoResponse,
7    routing::{post, Router},
8    APIHandler, DynamicRoute, EndpointType, StatusCode,
9};
10use saluki_core::runtime::{
11    state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
12};
13use saluki_error::{generic_error, GenericError};
14use serde::Deserialize;
15use tokio::{
16    select,
17    sync::{mpsc, Mutex},
18    time::sleep,
19};
20use tracing::{error, info};
21use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
22
23#[derive(Deserialize)]
24struct OverrideQueryParams {
25    time_secs: u64,
26}
27
28/// An action driven through a [`LoggingOverrideController`] and processed by [`LoggingOverrideWorker`].
29enum LoggingOverrideAction {
30    /// Apply `filter` as a temporary override on top of the current base, lasting `duration`.
31    ///
32    /// When the duration elapses, or a [`Reset`][LoggingOverrideAction::Reset] arrives, the worker restores the
33    /// current base filter.
34    Override { duration: Duration, filter: EnvFilter },
35
36    /// Clear any active override, immediately restoring the current base filter.
37    Reset,
38
39    /// Replace the current base filter.
40    ///
41    /// Applied immediately if no override is active. If an override is active, the new base is stored and applied
42    /// once the override expires (or is reset), so we don't clobber an in-flight override.
43    UpdateBase(EnvFilter),
44}
45
46/// Controls the dynamic logging filter at runtime.
47///
48/// All filter mutations—temporary overrides, resets, and base updates—flow through here to the
49/// [`LoggingOverrideWorker`] that owns the underlying `tracing` reload handle. Cloning is cheap; hand clones to
50/// any caller that needs to drive filter changes (HTTP handlers, configuration watchers, etc.).
51#[derive(Clone)]
52pub struct LoggingOverrideController {
53    tx: mpsc::Sender<LoggingOverrideAction>,
54}
55
56impl LoggingOverrideController {
57    /// Applies `filter` as a temporary override for `duration`, after which the base filter is restored.
58    pub(crate) async fn override_for(&self, duration: Duration, filter: EnvFilter) -> Result<(), GenericError> {
59        self.send(LoggingOverrideAction::Override { duration, filter }).await
60    }
61
62    /// Clears any active override, immediately restoring the current base filter.
63    pub(crate) async fn reset(&self) -> Result<(), GenericError> {
64        self.send(LoggingOverrideAction::Reset).await
65    }
66
67    /// Replaces the current base filter.
68    ///
69    /// Applied immediately if no override is active; otherwise stored and applied when the override expires.
70    pub async fn update_base(&self, filter: EnvFilter) -> Result<(), GenericError> {
71        self.send(LoggingOverrideAction::UpdateBase(filter)).await
72    }
73
74    async fn send(&self, action: LoggingOverrideAction) -> Result<(), GenericError> {
75        self.tx
76            .send(action)
77            .await
78            .map_err(|_| generic_error!("logging override worker is no longer running"))
79    }
80}
81
82/// State used for the logging API handler.
83#[derive(Clone)]
84pub struct LoggingHandlerState {
85    controller: LoggingOverrideController,
86}
87
88/// An API handler for updating log filtering directives at runtime.
89///
90/// This handler exposes two main routes -- `/logging/override` and `/logging/reset` -- which allow for overriding the
91/// default log filtering directives (configured at startup) at runtime, and then resetting them once the override is no
92/// longer needed.
93///
94/// As this has the potential for incredibly verbose logging at runtime, the override is set with a specific duration in
95/// which it will apply. Once an override has been active for the configured duration, it will automatically be reset
96/// unless the override is refreshed before the duration elapses.
97///
98/// The maximum duration for an override is 10 minutes.
99pub struct LoggingAPIHandler {
100    state: LoggingHandlerState,
101}
102
103impl LoggingAPIHandler {
104    /// Creates a new `LoggingAPIHandler` driven by the given controller.
105    fn new(controller: LoggingOverrideController) -> Self {
106        Self {
107            state: LoggingHandlerState { controller },
108        }
109    }
110
111    async fn override_handler(
112        State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
113    ) -> impl IntoResponse {
114        // Make sure the override length is within the acceptable range.
115        const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
116        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
117            return (
118                StatusCode::BAD_REQUEST,
119                format!(
120                    "override time cannot be greater than {} seconds",
121                    MAXIMUM_OVERRIDE_LENGTH_SECS
122                ),
123            );
124        }
125
126        // Parse the override duration and create a new filter from the body.
127        let duration = Duration::from_secs(params.time_secs);
128        let new_filter = match EnvFilter::try_new(body) {
129            Ok(filter) => filter,
130            Err(e) => {
131                return (
132                    StatusCode::BAD_REQUEST,
133                    format!("failed to parse override filter: {}", e),
134                )
135            }
136        };
137
138        // Instruct the override worker to apply the new log filtering directives for the given duration.
139        let _ = state.controller.override_for(duration, new_filter).await;
140
141        (StatusCode::OK, "acknowledged".to_string())
142    }
143
144    async fn reset_handler(State(state): State<LoggingHandlerState>) {
145        // Instruct the override worker to immediately reset back to the current base log filtering directives.
146        let _ = state.controller.reset().await;
147    }
148}
149
150impl APIHandler for LoggingAPIHandler {
151    type State = LoggingHandlerState;
152
153    fn generate_initial_state(&self) -> Self::State {
154        self.state.clone()
155    }
156
157    fn generate_routes(&self) -> Router<Self::State> {
158        Router::new()
159            .route("/logging/override", post(Self::override_handler))
160            .route("/logging/reset", post(Self::reset_handler))
161    }
162}
163
164/// A worker that processes log filter directive overrides.
165///
166/// When running, the worker asserts a set of routes (based on [`LoggingAPIHandler`]) that allow triggering
167/// an override of the current logging filter directives as well as clearing (resetting) the active override.
168pub struct LoggingOverrideWorker {
169    handler: LoggingAPIHandler,
170    state: Arc<Mutex<LoggingOverrideWorkerState>>,
171}
172
173struct LoggingOverrideWorkerState {
174    reload_handle: Handle<EnvFilter, Registry>,
175    rx: mpsc::Receiver<LoggingOverrideAction>,
176}
177
178impl LoggingOverrideWorker {
179    /// Creates a new `LoggingOverrideWorker` driving the given reload handle.
180    ///
181    /// When the worker starts, the filter directives present in the reload handle will be used as the "base" filter:
182    /// the filter that's reapplied after an override expires or is reset. This base filter can then be subsequently
183    /// updated through the [`LoggingOverrideController`] handle that's returned.
184    pub(super) fn new(reload_handle: Handle<EnvFilter, Registry>) -> (Self, LoggingOverrideController) {
185        let (tx, rx) = mpsc::channel(1);
186        let controller = LoggingOverrideController { tx };
187        let handler = LoggingAPIHandler::new(controller.clone());
188        let worker = Self {
189            handler,
190            state: Arc::new(Mutex::new(LoggingOverrideWorkerState { reload_handle, rx })),
191        };
192
193        (worker, controller)
194    }
195}
196
197#[async_trait]
198impl Supervisable for LoggingOverrideWorker {
199    fn name(&self) -> &str {
200        "dynamic-logging-override-processor"
201    }
202
203    async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
204        let mut state = self.state.clone().lock_owned().await;
205        let logging_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
206
207        Ok(Box::pin(async move {
208            DataspaceRegistry::try_current()
209                .ok_or_else(|| generic_error!("Dataspace not available."))?
210                .assert(logging_route, "logging-api");
211
212            process_override_actions(&mut state, process_shutdown).await;
213            Ok(())
214        }))
215    }
216}
217
218async fn process_override_actions(state: &mut LoggingOverrideWorkerState, mut process_shutdown: ProcessShutdown) {
219    // Seed the canonical base filter from the reload handle. If the underlying reload layer has been dropped, we
220    // cannot perform overrides or resets meaningfully: just emit an error and wait for shutdown so that we don't
221    // exit prematurely and drive the supervisor into an infinite restart loop.
222    let mut base_filter = match state.reload_handle.clone_current() {
223        Some(filter) => filter,
224        None => {
225            error!("Logging subsystem is in an indeterminate state; dynamic log filtering will not be available.");
226
227            process_shutdown.wait_for_shutdown().await;
228            return;
229        }
230    };
231
232    let mut override_active = false;
233    let override_timeout = sleep(Duration::from_secs(3600));
234
235    tokio::pin!(override_timeout);
236
237    let shutdown = process_shutdown.wait_for_shutdown();
238    tokio::pin!(shutdown);
239
240    loop {
241        select! {
242            _ = &mut shutdown => break,
243            maybe_action = state.rx.recv() => match maybe_action {
244                Some(LoggingOverrideAction::Override { duration, filter }) => {
245                    info!(directives = %filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
246
247                    match state.reload_handle.reload(filter) {
248                        Ok(()) => {
249                            // We were able to successfully reload the filter, so mark ourselves as having an active
250                            // override and update the override timeout.
251                            override_active = true;
252                            override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
253                        },
254                        Err(e) => error!(error = %e, "Failed to override log filtering directives."),
255                    }
256                },
257
258                Some(LoggingOverrideAction::Reset) => {
259                    // We've been instructed to immediately reset the filter back to the base one, so simply update
260                    // the override timeout to fire as soon as possible.
261                    override_timeout.as_mut().reset(tokio::time::Instant::now());
262                },
263
264                Some(LoggingOverrideAction::UpdateBase(new_base)) => {
265                    // Before replacing the base filter, check if the new base is different from the current one before we trigger a reload
266                    // and log a big, noisy message.
267                    //
268                    // We do this in a hacky way and compare the stringified version of each filter since `EnvFilter` can't be directly compared.
269                    let existing_base_filter_str = base_filter.to_string();
270                    let new_base_filter_str = new_base.to_string();
271                    if new_base_filter_str == existing_base_filter_str {
272                        continue;
273                    }
274
275                    base_filter = new_base;
276
277                    if override_active {
278                        // An override is currently active. Don't clobber it -- the new base will take effect when
279                        // the override expires.
280                        info!(
281                            directives = %base_filter,
282                            "Updated base log filtering directives; application deferred until active override expires."
283                        );
284                    } else {
285                        let new_directives = base_filter.to_string();
286                        match state.reload_handle.reload(base_filter.clone()) {
287                            Ok(()) => info!(directives = %new_directives, "Updated base log filtering directives."),
288                            Err(e) => error!(error = %e, "Failed to update base log filtering directives."),
289                        }
290                    }
291                },
292
293                // Our sender has dropped, so there's no more actions for us to handle.
294                None => break,
295            },
296            _ = &mut override_timeout => {
297                // Our override timeout has fired. If we have an active override, reset it back to the base filter.
298                //
299                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
300                // reset the timeout with a long duration.
301                if override_active {
302                    override_active = false;
303
304                    let restore_directives = base_filter.to_string();
305                    if let Err(e) = state.reload_handle.reload(base_filter.clone()) {
306                        error!(error = %e, "Failed to reset log filtering directives.");
307                    }
308
309                    info!(directives = %restore_directives, "Restored base log filtering directives.");
310                }
311
312                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
313            }
314        }
315    }
316
317    // Reset our filter to the base one before we exit.
318    if let Err(e) = state.reload_handle.reload(base_filter) {
319        error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::time::Duration;
326
327    use tokio::{sync::mpsc, time::sleep};
328    use tracing_subscriber::{reload, EnvFilter, Registry};
329
330    use super::*;
331
332    fn current_filter(handle: &reload::Handle<EnvFilter, Registry>) -> String {
333        handle
334            .clone_current()
335            .expect("reload layer should be alive")
336            .to_string()
337    }
338
339    async fn wait_for_filter(handle: &reload::Handle<EnvFilter, Registry>, expected: &str) {
340        let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
341
342        loop {
343            let actual = current_filter(handle);
344            if actual == expected {
345                return;
346            }
347
348            assert!(
349                tokio::time::Instant::now() < deadline,
350                "filter did not become `{expected}`; last value was `{actual}`"
351            );
352
353            sleep(Duration::from_millis(10)).await;
354        }
355    }
356
357    fn spawn_processor(
358        base_filter: EnvFilter,
359    ) -> (
360        LoggingOverrideController,
361        reload::Handle<EnvFilter, Registry>,
362        tokio::task::JoinHandle<()>,
363        reload::Layer<EnvFilter, Registry>,
364    ) {
365        let (filter_layer, reload_handle) = reload::Layer::new(base_filter);
366        let (tx, rx) = mpsc::channel(1);
367        let controller = LoggingOverrideController { tx };
368        let mut state = LoggingOverrideWorkerState {
369            reload_handle: reload_handle.clone(),
370            rx,
371        };
372        let processor = tokio::spawn(async move {
373            process_override_actions(&mut state, ProcessShutdown::noop()).await;
374        });
375
376        // The reload layer must outlive the handles -- callers should bind it (e.g., `let _layer = ...`) to keep
377        // the handle alive for the duration of the test.
378        (controller, reload_handle, processor, filter_layer)
379    }
380
381    #[tokio::test]
382    async fn reset_restores_current_base_filter() {
383        let (controller, reload_handle, processor, _filter_layer) =
384            spawn_processor(EnvFilter::new("agent_data_plane=info"));
385
386        controller
387            .override_for(Duration::from_secs(60), EnvFilter::new("hyper=warn"))
388            .await
389            .expect("send override");
390        wait_for_filter(&reload_handle, "hyper=warn").await;
391
392        controller
393            .update_base(EnvFilter::new("agent_data_plane=debug"))
394            .await
395            .expect("send update_base");
396        controller.reset().await.expect("send reset");
397        wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
398
399        drop(controller);
400        processor.await.expect("override processor should exit cleanly");
401    }
402
403    #[tokio::test]
404    async fn override_expiration_restores_current_base_filter() {
405        let (controller, reload_handle, processor, _filter_layer) =
406            spawn_processor(EnvFilter::new("agent_data_plane=info"));
407
408        controller
409            .override_for(Duration::from_millis(100), EnvFilter::new("hyper=warn"))
410            .await
411            .expect("send override");
412        wait_for_filter(&reload_handle, "hyper=warn").await;
413
414        controller
415            .update_base(EnvFilter::new("agent_data_plane=warn"))
416            .await
417            .expect("send update_base");
418        wait_for_filter(&reload_handle, "agent_data_plane=warn").await;
419
420        drop(controller);
421        processor.await.expect("override processor should exit cleanly");
422    }
423
424    #[tokio::test]
425    async fn update_base_applies_immediately_when_no_override_active() {
426        let (controller, reload_handle, processor, _filter_layer) =
427            spawn_processor(EnvFilter::new("agent_data_plane=info"));
428
429        controller
430            .update_base(EnvFilter::new("agent_data_plane=debug"))
431            .await
432            .expect("send update_base");
433        wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
434
435        drop(controller);
436        processor.await.expect("override processor should exit cleanly");
437    }
438}