Skip to main content

saluki_app/logging/
api.rs

1use std::{sync::Arc, time::Duration};
2
3use async_trait::async_trait;
4use saluki_api::{
5    extract::{Query, State},
6    response::IntoResponse,
7    routing::{post, Router},
8    APIHandler, DynamicRoute, EndpointType, StatusCode,
9};
10use saluki_common::sync::shutdown::ShutdownHandle;
11use saluki_core::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture};
12use saluki_error::{generic_error, GenericError};
13use serde::Deserialize;
14use tokio::{
15    pin, select,
16    sync::{mpsc, Mutex},
17    time::sleep,
18};
19use tracing::{error, info};
20use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
21
22#[derive(Deserialize)]
23struct OverrideQueryParams {
24    time_secs: u64,
25}
26
27/// An action driven through a [`LoggingOverrideController`] and processed by [`LoggingOverrideWorker`].
28enum LoggingOverrideAction {
29    /// Apply `filter` as a temporary override on top of the current base, lasting `duration`.
30    ///
31    /// When the duration elapses, or a [`Reset`][LoggingOverrideAction::Reset] arrives, the worker restores the
32    /// current base filter.
33    Override { duration: Duration, filter: EnvFilter },
34
35    /// Clear any active override, immediately restoring the current base filter.
36    Reset,
37
38    /// Replace the current base filter.
39    ///
40    /// Applied immediately if no override is active. If an override is active, the new base is stored and applied
41    /// once the override expires (or is reset), so we don't clobber an in-flight override.
42    UpdateBase(EnvFilter),
43}
44
45/// Controls the dynamic logging filter at runtime.
46///
47/// All filter mutations—temporary overrides, resets, and base updates—flow through here to the
48/// [`LoggingOverrideWorker`] that owns the underlying `tracing` reload handle. Cloning is cheap; hand clones to
49/// any caller that needs to drive filter changes (HTTP handlers, configuration watchers, etc.).
50#[derive(Clone)]
51pub struct LoggingOverrideController {
52    tx: mpsc::Sender<LoggingOverrideAction>,
53}
54
55impl LoggingOverrideController {
56    /// Applies `filter` as a temporary override for `duration`, after which the base filter is restored.
57    pub(crate) async fn override_for(&self, duration: Duration, filter: EnvFilter) -> Result<(), GenericError> {
58        self.send(LoggingOverrideAction::Override { duration, filter }).await
59    }
60
61    /// Clears any active override, immediately restoring the current base filter.
62    pub(crate) async fn reset(&self) -> Result<(), GenericError> {
63        self.send(LoggingOverrideAction::Reset).await
64    }
65
66    /// Replaces the current base filter.
67    ///
68    /// Applied immediately if no override is active; otherwise stored and applied when the override expires.
69    pub async fn update_base(&self, filter: EnvFilter) -> Result<(), GenericError> {
70        self.send(LoggingOverrideAction::UpdateBase(filter)).await
71    }
72
73    async fn send(&self, action: LoggingOverrideAction) -> Result<(), GenericError> {
74        self.tx
75            .send(action)
76            .await
77            .map_err(|_| generic_error!("logging override worker is no longer running"))
78    }
79}
80
81/// State used for the logging API handler.
82#[derive(Clone)]
83pub struct LoggingHandlerState {
84    controller: LoggingOverrideController,
85}
86
87/// An API handler for updating log filtering directives at runtime.
88///
89/// This handler exposes two main routes -- `/logging/override` and `/logging/reset` -- which allow for overriding the
90/// default log filtering directives (configured at startup) at runtime, and then resetting them once the override is no
91/// longer needed.
92///
93/// As this has the potential for incredibly verbose logging at runtime, the override is set with a specific duration in
94/// which it will apply. Once an override has been active for the configured duration, it will automatically be reset
95/// unless the override is refreshed before the duration elapses.
96///
97/// The maximum duration for an override is 10 minutes.
98pub struct LoggingAPIHandler {
99    state: LoggingHandlerState,
100}
101
102impl LoggingAPIHandler {
103    /// Creates a new `LoggingAPIHandler` driven by the given controller.
104    fn new(controller: LoggingOverrideController) -> Self {
105        Self {
106            state: LoggingHandlerState { controller },
107        }
108    }
109
110    async fn override_handler(
111        State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
112    ) -> impl IntoResponse {
113        // Make sure the override length is within the acceptable range.
114        const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
115        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
116            return (
117                StatusCode::BAD_REQUEST,
118                format!(
119                    "override time cannot be greater than {} seconds",
120                    MAXIMUM_OVERRIDE_LENGTH_SECS
121                ),
122            );
123        }
124
125        // Parse the override duration and create a new filter from the body.
126        let duration = Duration::from_secs(params.time_secs);
127        let new_filter = match EnvFilter::try_new(body) {
128            Ok(filter) => filter,
129            Err(e) => {
130                return (
131                    StatusCode::BAD_REQUEST,
132                    format!("failed to parse override filter: {}", e),
133                )
134            }
135        };
136
137        // Instruct the override worker to apply the new log filtering directives for the given duration.
138        let _ = state.controller.override_for(duration, new_filter).await;
139
140        (StatusCode::OK, "acknowledged".to_string())
141    }
142
143    async fn reset_handler(State(state): State<LoggingHandlerState>) {
144        // Instruct the override worker to immediately reset back to the current base log filtering directives.
145        let _ = state.controller.reset().await;
146    }
147}
148
149impl APIHandler for LoggingAPIHandler {
150    type State = LoggingHandlerState;
151
152    fn generate_initial_state(&self) -> Self::State {
153        self.state.clone()
154    }
155
156    fn generate_routes(&self) -> Router<Self::State> {
157        Router::new()
158            .route("/logging/override", post(Self::override_handler))
159            .route("/logging/reset", post(Self::reset_handler))
160    }
161}
162
163/// A worker that processes log filter directive overrides.
164///
165/// When running, the worker asserts a set of routes (based on [`LoggingAPIHandler`]) that allow triggering
166/// an override of the current logging filter directives as well as clearing (resetting) the active override.
167pub struct LoggingOverrideWorker {
168    handler: LoggingAPIHandler,
169    state: Arc<Mutex<LoggingOverrideWorkerState>>,
170}
171
172struct LoggingOverrideWorkerState {
173    reload_handle: Handle<EnvFilter, Registry>,
174    rx: mpsc::Receiver<LoggingOverrideAction>,
175}
176
177impl LoggingOverrideWorker {
178    /// Creates a new `LoggingOverrideWorker` driving the given reload handle.
179    ///
180    /// When the worker starts, the filter directives present in the reload handle will be used as the "base" filter:
181    /// the filter that's reapplied after an override expires or is reset. This base filter can then be subsequently
182    /// updated through the [`LoggingOverrideController`] handle that's returned.
183    pub(super) fn new(reload_handle: Handle<EnvFilter, Registry>) -> (Self, LoggingOverrideController) {
184        let (tx, rx) = mpsc::channel(1);
185        let controller = LoggingOverrideController { tx };
186        let handler = LoggingAPIHandler::new(controller.clone());
187        let worker = Self {
188            handler,
189            state: Arc::new(Mutex::new(LoggingOverrideWorkerState { reload_handle, rx })),
190        };
191
192        (worker, controller)
193    }
194}
195
196#[async_trait]
197impl Supervisable for LoggingOverrideWorker {
198    fn name(&self) -> &str {
199        "dynamic-logging-override-processor"
200    }
201
202    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
203        let mut state = self.state.clone().lock_owned().await;
204        let logging_route = DynamicRoute::http(EndpointType::Privileged, &self.handler);
205
206        Ok(Box::pin(async move {
207            DataspaceRegistry::try_current()
208                .ok_or_else(|| generic_error!("Dataspace not available."))?
209                .assert(logging_route, "logging-api");
210
211            process_override_actions(&mut state, process_shutdown).await;
212            Ok(())
213        }))
214    }
215}
216
217async fn process_override_actions(state: &mut LoggingOverrideWorkerState, process_shutdown: ShutdownHandle) {
218    // Seed the canonical base filter from the reload handle. If the underlying reload layer has been dropped, we
219    // cannot perform overrides or resets meaningfully: just emit an error and wait for shutdown so that we don't
220    // exit prematurely and drive the supervisor into an infinite restart loop.
221    let mut base_filter = match state.reload_handle.clone_current() {
222        Some(filter) => filter,
223        None => {
224            error!("Logging subsystem is in an indeterminate state; dynamic log filtering will not be available.");
225
226            process_shutdown.await;
227            return;
228        }
229    };
230
231    let mut override_active = false;
232    let override_timeout = sleep(Duration::from_secs(3600));
233
234    pin!(override_timeout, process_shutdown);
235
236    loop {
237        select! {
238            _ = &mut process_shutdown => break,
239            maybe_action = state.rx.recv() => match maybe_action {
240                Some(LoggingOverrideAction::Override { duration, filter }) => {
241                    info!(directives = %filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
242
243                    match state.reload_handle.reload(filter) {
244                        Ok(()) => {
245                            // We were able to successfully reload the filter, so mark ourselves as having an active
246                            // override and update the override timeout.
247                            override_active = true;
248                            override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
249                        },
250                        Err(e) => error!(error = %e, "Failed to override log filtering directives."),
251                    }
252                },
253
254                Some(LoggingOverrideAction::Reset) => {
255                    // We've been instructed to immediately reset the filter back to the base one, so simply update
256                    // the override timeout to fire as soon as possible.
257                    override_timeout.as_mut().reset(tokio::time::Instant::now());
258                },
259
260                Some(LoggingOverrideAction::UpdateBase(new_base)) => {
261                    // Before replacing the base filter, check if the new base is different from the current one before we trigger a reload
262                    // and log a big, noisy message.
263                    //
264                    // We do this in a hacky way and compare the stringified version of each filter since `EnvFilter` can't be directly compared.
265                    let existing_base_filter_str = base_filter.to_string();
266                    let new_base_filter_str = new_base.to_string();
267                    if new_base_filter_str == existing_base_filter_str {
268                        continue;
269                    }
270
271                    base_filter = new_base;
272
273                    if override_active {
274                        // An override is currently active. Don't clobber it -- the new base will take effect when
275                        // the override expires.
276                        info!(
277                            directives = %base_filter,
278                            "Updated base log filtering directives; application deferred until active override expires."
279                        );
280                    } else {
281                        let new_directives = base_filter.to_string();
282                        match state.reload_handle.reload(base_filter.clone()) {
283                            Ok(()) => info!(directives = %new_directives, "Updated base log filtering directives."),
284                            Err(e) => error!(error = %e, "Failed to update base log filtering directives."),
285                        }
286                    }
287                },
288
289                // Our sender has dropped, so there's no more actions for us to handle.
290                None => break,
291            },
292            _ = &mut override_timeout => {
293                // Our override timeout has fired. If we have an active override, reset it back to the base filter.
294                //
295                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
296                // reset the timeout with a long duration.
297                if override_active {
298                    override_active = false;
299
300                    let restore_directives = base_filter.to_string();
301                    if let Err(e) = state.reload_handle.reload(base_filter.clone()) {
302                        error!(error = %e, "Failed to reset log filtering directives.");
303                    }
304
305                    info!(directives = %restore_directives, "Restored base log filtering directives.");
306                }
307
308                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
309            }
310        }
311    }
312
313    // Reset our filter to the base one before we exit.
314    if let Err(e) = state.reload_handle.reload(base_filter) {
315        error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use std::time::Duration;
322
323    use tokio::{sync::mpsc, time::sleep};
324    use tracing_subscriber::{reload, EnvFilter, Registry};
325
326    use super::*;
327
328    fn current_filter(handle: &reload::Handle<EnvFilter, Registry>) -> String {
329        handle
330            .clone_current()
331            .expect("reload layer should be alive")
332            .to_string()
333    }
334
335    async fn wait_for_filter(handle: &reload::Handle<EnvFilter, Registry>, expected: &str) {
336        let deadline = tokio::time::Instant::now() + Duration::from_secs(1);
337
338        loop {
339            let actual = current_filter(handle);
340            if actual == expected {
341                return;
342            }
343
344            assert!(
345                tokio::time::Instant::now() < deadline,
346                "filter did not become `{expected}`; last value was `{actual}`"
347            );
348
349            sleep(Duration::from_millis(10)).await;
350        }
351    }
352
353    fn spawn_processor(
354        base_filter: EnvFilter,
355    ) -> (
356        LoggingOverrideController,
357        reload::Handle<EnvFilter, Registry>,
358        tokio::task::JoinHandle<()>,
359        reload::Layer<EnvFilter, Registry>,
360    ) {
361        let (filter_layer, reload_handle) = reload::Layer::new(base_filter);
362        let (tx, rx) = mpsc::channel(1);
363        let controller = LoggingOverrideController { tx };
364        let mut state = LoggingOverrideWorkerState {
365            reload_handle: reload_handle.clone(),
366            rx,
367        };
368        let processor = tokio::spawn(async move {
369            process_override_actions(&mut state, ShutdownHandle::noop()).await;
370        });
371
372        // The reload layer must outlive the handles -- callers should bind it (e.g., `let _layer = ...`) to keep
373        // the handle alive for the duration of the test.
374        (controller, reload_handle, processor, filter_layer)
375    }
376
377    #[tokio::test]
378    async fn reset_restores_current_base_filter() {
379        let (controller, reload_handle, processor, _filter_layer) =
380            spawn_processor(EnvFilter::new("agent_data_plane=info"));
381
382        controller
383            .override_for(Duration::from_secs(60), EnvFilter::new("hyper=warn"))
384            .await
385            .expect("send override");
386        wait_for_filter(&reload_handle, "hyper=warn").await;
387
388        controller
389            .update_base(EnvFilter::new("agent_data_plane=debug"))
390            .await
391            .expect("send update_base");
392        controller.reset().await.expect("send reset");
393        wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
394
395        drop(controller);
396        processor.await.expect("override processor should exit cleanly");
397    }
398
399    #[tokio::test]
400    async fn override_expiration_restores_current_base_filter() {
401        let (controller, reload_handle, processor, _filter_layer) =
402            spawn_processor(EnvFilter::new("agent_data_plane=info"));
403
404        controller
405            .override_for(Duration::from_millis(100), EnvFilter::new("hyper=warn"))
406            .await
407            .expect("send override");
408        wait_for_filter(&reload_handle, "hyper=warn").await;
409
410        controller
411            .update_base(EnvFilter::new("agent_data_plane=warn"))
412            .await
413            .expect("send update_base");
414        wait_for_filter(&reload_handle, "agent_data_plane=warn").await;
415
416        drop(controller);
417        processor.await.expect("override processor should exit cleanly");
418    }
419
420    #[tokio::test]
421    async fn update_base_applies_immediately_when_no_override_active() {
422        let (controller, reload_handle, processor, _filter_layer) =
423            spawn_processor(EnvFilter::new("agent_data_plane=info"));
424
425        controller
426            .update_base(EnvFilter::new("agent_data_plane=debug"))
427            .await
428            .expect("send update_base");
429        wait_for_filter(&reload_handle, "agent_data_plane=debug").await;
430
431        drop(controller);
432        processor.await.expect("override processor should exit cleanly");
433    }
434}