saluki_app/logging/
api.rs

1use std::{sync::Mutex, time::Duration};
2
3use saluki_api::{
4    extract::{Query, State},
5    response::IntoResponse,
6    routing::{post, Router},
7    APIHandler, StatusCode,
8};
9use saluki_common::task::spawn_traced_named;
10use serde::Deserialize;
11use tokio::{select, sync::mpsc, time::sleep};
12use tracing::{error, info};
13use tracing_subscriber::{reload::Handle, EnvFilter, Registry};
14
15static API_HANDLER: Mutex<Option<LoggingAPIHandler>> = Mutex::new(None);
16
17pub(super) fn set_logging_api_handler(handler: LoggingAPIHandler) {
18    API_HANDLER.lock().unwrap().replace(handler);
19}
20
21/// Acquires the logging API handler.
22///
23/// This function is mutable, and consumes the handler if it's present. This means it should only be called once, and
24/// only after logging has been initialized via `initialize_dynamic_logging`.
25///
26/// The logging API handler can be used to install API routes which allow dynamically controlling the logging level
27/// filtering. See [`LoggingAPIHandler`] for more information.
28pub fn acquire_logging_api_handler() -> Option<LoggingAPIHandler> {
29    API_HANDLER.lock().unwrap().take()
30}
31
32#[derive(Deserialize)]
33struct OverrideQueryParams {
34    time_secs: u64,
35}
36
37/// State used for the logging API handler.
38#[derive(Clone)]
39pub struct LoggingHandlerState {
40    override_tx: mpsc::Sender<Option<(Duration, EnvFilter)>>,
41}
42
43/// An API handler for updating log filtering directives at runtime.
44///
45/// This handler exposes two main routes -- `/logging/override` and `/logging/reset` -- which allow for overriding the
46/// default log filtering directives (configured at startup) at runtime, and then resetting them once the override is no
47/// longer needed.
48///
49/// As this has the potential for incredibly verbose logging at runtime, the override is set with a specific duration in
50/// which it will apply. Once an override has been active for the configured duration, it will automatically be reset
51/// unless the override is refreshed before the duration elapses.
52///
53/// The maximum duration for an override is 10 minutes.
54pub struct LoggingAPIHandler {
55    state: LoggingHandlerState,
56}
57
58impl LoggingAPIHandler {
59    pub(super) fn new(original_filter: EnvFilter, reload_handle: Handle<EnvFilter, Registry>) -> Self {
60        // Spawn our background task that will handle override requests.
61        let (override_tx, override_rx) = mpsc::channel(1);
62        spawn_traced_named(
63            "dynamic-logging-override-processor",
64            process_override_requests(original_filter, reload_handle, override_rx),
65        );
66
67        Self {
68            state: LoggingHandlerState { override_tx },
69        }
70    }
71
72    async fn override_handler(
73        State(state): State<LoggingHandlerState>, params: Query<OverrideQueryParams>, body: String,
74    ) -> impl IntoResponse {
75        // Make sure the override length is within the acceptable range.
76        const MAXIMUM_OVERRIDE_LENGTH_SECS: u64 = 600;
77        if params.time_secs > MAXIMUM_OVERRIDE_LENGTH_SECS {
78            return (
79                StatusCode::BAD_REQUEST,
80                format!(
81                    "override time cannot be greater than {} seconds",
82                    MAXIMUM_OVERRIDE_LENGTH_SECS
83                ),
84            );
85        }
86
87        // Parse the override duration and create a new filter from the body.
88        let duration = Duration::from_secs(params.time_secs);
89        let new_filter = match EnvFilter::try_new(body) {
90            Ok(filter) => filter,
91            Err(e) => {
92                return (
93                    StatusCode::BAD_REQUEST,
94                    format!("failed to parse override filter: {}", e),
95                )
96            }
97        };
98
99        // Instruct the override processor to apply the new log filtering directives for the given duration.
100        let _ = state.override_tx.send(Some((duration, new_filter))).await;
101
102        (StatusCode::OK, "acknowledged".to_string())
103    }
104
105    async fn reset_handler(State(state): State<LoggingHandlerState>) {
106        // Instruct the override processor to immediately reset back to the original log filtering directives.
107        let _ = state.override_tx.send(None).await;
108    }
109}
110
111impl APIHandler for LoggingAPIHandler {
112    type State = LoggingHandlerState;
113
114    fn generate_initial_state(&self) -> Self::State {
115        self.state.clone()
116    }
117
118    fn generate_routes(&self) -> Router<Self::State> {
119        Router::new()
120            .route("/logging/override", post(Self::override_handler))
121            .route("/logging/reset", post(Self::reset_handler))
122    }
123}
124
125async fn process_override_requests(
126    original_filter: EnvFilter, reload_handle: Handle<EnvFilter, Registry>,
127    mut rx: mpsc::Receiver<Option<(Duration, EnvFilter)>>,
128) {
129    let mut override_active = false;
130    let override_timeout = sleep(Duration::from_secs(3600));
131
132    tokio::pin!(override_timeout);
133
134    loop {
135        select! {
136            maybe_override = rx.recv() => match maybe_override {
137                Some(Some((duration, new_filter))) => {
138                    info!(directives = %new_filter, "Overriding existing log filtering directives for {} seconds...", duration.as_secs());
139
140                    match reload_handle.reload(new_filter) {
141                        Ok(()) => {
142                            // We were able to successfully reload the filter, so mark ourselves as having an active
143                            // override and update the override timeout.
144                            override_active = true;
145                            override_timeout.as_mut().reset(tokio::time::Instant::now() + duration);
146                        },
147                        Err(e) => error!(error = %e, "Failed to override log filtering directives."),
148                    }
149                },
150
151                Some(None) => {
152                    // We've been instructed to immediately reset the filter back to the original one, so simply update
153                    // the override timeout to fire as soon as possible.
154                    override_timeout.as_mut().reset(tokio::time::Instant::now());
155                },
156
157                // Our sender has dropped, so there's no more override requests for us to handle.
158                None => break,
159            },
160            _ = &mut override_timeout => {
161                // Our override timeout has fired. If we have an active override, reset it back to the original filter.
162                //
163                // Otherwise, this just means that we've been running for a while without any overrides, so we can just
164                // reset the timeout with a long duration.
165                if override_active {
166                    override_active = false;
167
168                    if let Err(e) = reload_handle.reload(original_filter.clone()) {
169                        error!(error = %e, "Failed to reset log filtering directives.");
170                    }
171
172                    info!(directives = %original_filter, "Restored original log filtering directives.");
173                }
174
175                override_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(3600));
176            }
177        }
178    }
179
180    // Reset our filter to the original one before we exit.
181    if let Err(e) = reload_handle.reload(original_filter) {
182        error!(error = %e, "Failed to reset log filtering directives before override handler shutdown.");
183    }
184}