Skip to main content

saluki_app/logging/
mod.rs

1//! Logging.
2
3// TODO: `AgentLikeFieldVisitor` currently allocates a `String` to hold the message field when it finds it. This is
4// suboptimal because it means we allocate a string literally every time we log a message. Logging is rare, but it's
5// just a recipe for small, unnecessary allocations over time... and makes it that much more inefficient to enable
6// debug/trace-level logging in production.
7//
8// We might consider _something_ like a string pool in the future, but we can defer that until we have a better idea of
9// what the potential impact is in practice.
10
11use std::io::Write;
12
13use saluki_error::{generic_error, GenericError};
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15use tracing_rolling_file::RollingFileAppenderBase;
16use tracing_subscriber::{layer::SubscriberExt as _, reload, util::SubscriberInitExt as _, Layer, Registry};
17
18mod api;
19use self::api::set_logging_api_handler;
20pub use self::api::{acquire_logging_api_handler, LoggingAPIHandler, LoggingOverrideController, LoggingOverrideWorker};
21
22mod config;
23pub use self::config::{LogLevel, LoggingConfiguration};
24
25mod layer;
26use self::layer::{build_formatting_layer, build_syslog_formatting_layer};
27
28mod syslog;
29use self::syslog::SyslogWriter;
30
31// Number of buffered lines in each non-blocking log writer.
32//
33// This directly influences the idle memory usage since each logging backend (console, file, etc) will have a bounded
34// channel that can hold this many elements, and each element is roughly 32 bytes, so 1,000 elements/lines consumes a
35// minimum of ~32KB, etc.
36const NB_LOG_WRITER_BUFFER_SIZE: usize = 4096;
37
38type OutputStack = Vec<Box<dyn Layer<Registry> + Send + Sync>>;
39
40/// A handle to the dynamic logging subsystem.
41///
42/// Held by [`BootstrapGuard`][crate::bootstrap::BootstrapGuard] for the lifetime of the application. Owns the
43/// worker guards (which flush buffered output on drop), and exposes [`reload`][Self::reload] for swapping the
44/// entire logging configuration plus [`controller`][Self::controller] for driving runtime filter changes through
45/// the override worker.
46pub struct LoggingGuard {
47    worker_guards: Vec<WorkerGuard>,
48    stack_handle: reload::Handle<OutputStack, Registry>,
49    controller: LoggingOverrideController,
50}
51
52impl LoggingGuard {
53    /// Reloads the logging subsystem from the given configuration.
54    ///
55    /// Rebuilds the output layer stack from `config` and routes the new level filter through the override worker
56    /// as the new base filter, so an active override is preserved (the new base will take effect once the override
57    /// expires). Worker guards for the previous outputs are dropped after the swap, which flushes any buffered log
58    /// lines to their original destinations.
59    ///
60    /// This is the right entry point when the entire logging configuration may have changed (e.g., outputs,
61    /// format, level). For runtime base-filter changes only -- such as following a `log_level` config update --
62    /// use [`controller`][Self::controller] and call
63    /// [`update_base`][LoggingOverrideController::update_base] directly.
64    ///
65    /// # Errors
66    ///
67    /// Returns an error if the new output layers cannot be constructed (e.g., the configured log file path is
68    /// inaccessible) or if the override worker is no longer running.
69    pub async fn reload(&mut self, config: LoggingConfiguration) -> Result<(), GenericError> {
70        let (new_stack, new_guards) = build_output_stack(&config)?;
71        let new_filter = config.log_level.as_env_filter();
72
73        self.stack_handle
74            .reload(new_stack)
75            .map_err(|e| generic_error!("Failed to swap logging output stack: {}", e))?;
76        self.controller.update_base(new_filter).await?;
77
78        // Drop the old worker guards _after_ the swap so any buffered lines are flushed to their original destinations
79        // before the worker threads exit.
80        let _old_guards = std::mem::replace(&mut self.worker_guards, new_guards);
81
82        Ok(())
83    }
84
85    /// Returns a logging override controller that can be used to change the default filter directives.
86    pub fn controller(&self) -> LoggingOverrideController {
87        self.controller.clone()
88    }
89}
90
91/// Initializes the logging subsystem for `tracing` with the ability to dynamically update the log filtering directives
92/// at runtime.
93///
94/// An API handler can be acquired (via [`acquire_logging_api_handler`]) to install the API routes which allow for
95/// dynamically controlling the logging level filtering. See [`LoggingAPIHandler`] for more information.
96///
97/// Returns a [`LoggingGuard`] which must be held until the application is about to shutdown, plus a
98/// [`LoggingOverrideWorker`] that must be added to a [`Supervisor`][saluki_core::runtime::Supervisor] to drive
99/// the dynamic override processor; without the worker running, override requests are accepted but never applied.
100///
101/// # Errors
102///
103/// If the logging subsystem was already initialized, an error will be returned.
104pub(crate) async fn initialize_logging(
105    config: LoggingConfiguration,
106) -> Result<(LoggingGuard, LoggingOverrideWorker), GenericError> {
107    // Build the initial output stack from the supplied configuration. This is later swappable via
108    // `BootstrapGuard::reload_logging` once the Datadog Agent provides authoritative configuration.
109    let (output_stack, worker_guards) = build_output_stack(&config)?;
110    let (output_layer, stack_handle) = reload::Layer::new(output_stack);
111
112    // Set up our log level filtering and dynamic filter layer.
113    let level_filter = config.log_level.as_env_filter();
114    let (filter_layer, filter_handle) = reload::Layer::new(level_filter.clone());
115
116    // The override worker owns the canonical base filter -- the directives the system restores to after an override
117    // expires or is reset. It starts as the bootstrap level and is updated via the controller, both by
118    // `LoggingGuard::reload` once the Agent's configuration is applied and by any other caller (e.g. a runtime
119    // `log_level` watcher) wired up via [`LoggingGuard::controller`].
120    let (override_worker, controller) = LoggingOverrideWorker::new(level_filter, filter_handle);
121    set_logging_api_handler(LoggingAPIHandler::new(controller.clone()));
122
123    tracing_subscriber::registry()
124        .with(output_layer.with_filter(filter_layer))
125        .try_init()?;
126
127    Ok((
128        LoggingGuard {
129            worker_guards,
130            stack_handle,
131            controller,
132        },
133        override_worker,
134    ))
135}
136
137fn build_output_stack(config: &LoggingConfiguration) -> Result<(OutputStack, Vec<WorkerGuard>), GenericError> {
138    let mut layers: OutputStack = Vec::new();
139    let mut guards = Vec::new();
140
141    if config.log_to_console {
142        let (nb_stdout, guard) = writer_to_nonblocking("console", std::io::stdout());
143        guards.push(guard);
144        layers.push(build_formatting_layer(config, nb_stdout));
145    }
146
147    if !config.log_file.is_empty() {
148        let appender_builder = RollingFileAppenderBase::builder();
149        let appender = appender_builder
150            .filename(config.log_file.clone())
151            .max_filecount(config.log_file_max_rolls)
152            .condition_max_file_size(config.log_file_max_size.as_u64())
153            .build()
154            .map_err(|e| generic_error!("Failed to build log file appender: {}", e))?;
155
156        let (nb_appender, guard) = writer_to_nonblocking("file", appender);
157        guards.push(guard);
158        layers.push(build_formatting_layer(config, nb_appender));
159    }
160
161    if config.log_to_syslog {
162        let syslog_writer = SyslogWriter::from_uri(&config.syslog_uri)
163            .map_err(|e| generic_error!("Failed to build syslog log writer: {}", e))?;
164        // Keep syslog on the same lossy non-blocking path as console/file so logging never
165        // backpressures ADP.
166        let (nb_syslog, guard) = writer_to_nonblocking("syslog", syslog_writer);
167        guards.push(guard);
168        layers.push(build_syslog_formatting_layer(config, nb_syslog));
169    }
170
171    Ok((layers, guards))
172}
173
174fn writer_to_nonblocking<W>(writer_name: &'static str, writer: W) -> (NonBlocking, WorkerGuard)
175where
176    W: Write + Send + 'static,
177{
178    let thread_name = format!("log-writer-{}", writer_name);
179    NonBlockingBuilder::default()
180        .thread_name(&thread_name)
181        .buffered_lines_limit(NB_LOG_WRITER_BUFFER_SIZE)
182        .lossy(true)
183        .finish(writer)
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    const TEST_SYSLOG_URI: &str = "udp://127.0.0.1:9";
191
192    #[test]
193    fn output_stack_skips_syslog_when_disabled() {
194        let config = logging_config_without_outputs();
195
196        let (layers, guards) = build_output_stack(&config).expect("build output stack");
197
198        assert_eq!(layers.len(), 0);
199        assert_eq!(guards.len(), 0);
200    }
201
202    #[test]
203    fn output_stack_adds_syslog_layer_and_guard_when_enabled() {
204        let config = logging_config_with_syslog(TEST_SYSLOG_URI);
205
206        let (layers, guards) = build_output_stack(&config).expect("build output stack with syslog");
207
208        assert_eq!(layers.len(), 1);
209        assert_eq!(guards.len(), 1);
210    }
211
212    #[test]
213    fn output_stack_fails_when_syslog_uri_is_invalid() {
214        let config = logging_config_with_syslog("http://127.0.0.1:514");
215
216        let error = match build_output_stack(&config) {
217            Ok(_) => panic!("invalid syslog URI should fail output stack build"),
218            Err(error) => error,
219        };
220
221        let error = error.to_string();
222        assert!(error.contains("Failed to build syslog log writer"));
223        assert!(error.contains("Unsupported syslog URI scheme 'http'"));
224    }
225
226    #[tokio::test]
227    async fn reload_can_enable_change_disable_syslog_and_preserves_previous_stack_on_invalid_config() {
228        use saluki_core::runtime::{ProcessShutdown, Supervisable as _};
229
230        let config = logging_config_without_outputs();
231        let (output_stack, worker_guards) = build_output_stack(&config).expect("build initial output stack");
232        let (output_layer, stack_handle) = reload::Layer::new(output_stack);
233        let level_filter = config.log_level.as_env_filter();
234        let (filter_layer, filter_handle) = reload::Layer::new(level_filter.clone());
235        let (override_worker, controller) = LoggingOverrideWorker::new(level_filter, filter_handle);
236        let mut guard = LoggingGuard {
237            worker_guards,
238            stack_handle,
239            controller,
240        };
241        let _keep_layers_alive = (output_layer, filter_layer);
242
243        // Spawn the override worker so update_base calls inside reload() don't block on a full channel.
244        let worker_fut = override_worker
245            .initialize(ProcessShutdown::noop())
246            .await
247            .expect("worker init");
248        let worker_handle = tokio::spawn(worker_fut);
249
250        guard
251            .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
252            .await
253            .expect("reload should enable syslog");
254        assert_eq!(guard.worker_guards.len(), 1);
255
256        guard
257            .reload(logging_config_with_syslog("udp://127.0.0.1:10"))
258            .await
259            .expect("reload should change syslog URI");
260        assert_eq!(guard.worker_guards.len(), 1);
261
262        guard
263            .reload(logging_config_without_outputs())
264            .await
265            .expect("reload should disable syslog");
266        assert_eq!(guard.worker_guards.len(), 0);
267
268        guard
269            .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
270            .await
271            .expect("reload should re-enable syslog");
272        assert_eq!(guard.worker_guards.len(), 1);
273
274        let error = guard
275            .reload(logging_config_with_syslog("http://127.0.0.1:514"))
276            .await
277            .expect_err("invalid syslog URI should fail reload");
278        assert!(error.to_string().contains("Failed to build syslog log writer"));
279        assert_eq!(guard.worker_guards.len(), 1);
280
281        // Dropping the guard closes the controller channel, which lets the worker exit.
282        drop(guard);
283        worker_handle
284            .await
285            .expect("override worker should exit cleanly")
286            .expect("override worker should not error");
287    }
288
289    fn logging_config_without_outputs() -> LoggingConfiguration {
290        let mut config = LoggingConfiguration::simple();
291        config.log_to_console = false;
292        config.log_file.clear();
293        config.log_to_syslog = false;
294        config.syslog_uri.clear();
295        config
296    }
297
298    fn logging_config_with_syslog(uri: &str) -> LoggingConfiguration {
299        let mut config = logging_config_without_outputs();
300        config.log_to_syslog = true;
301        config.syslog_uri = uri.to_string();
302        config
303    }
304}