Skip to main content

saluki_app/logging/
mod.rs

1//! Logging.
2
3// TODO: `AgentLikeFieldVisitor` currently allocates a `String` to hold the message field when it finds it. This is
4// suboptimal because it means we allocate a string literally every time we log a message. Logging is rare, but it's
5// just a recipe for small, unnecessary allocations over time... and makes it that much more inefficient to enable
6// debug/trace-level logging in production.
7//
8// We might consider _something_ like a string pool in the future, but we can defer that until we have a better idea of
9// what the potential impact is in practice.
10
11use std::io::Write;
12
13use saluki_error::{generic_error, GenericError};
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15use tracing_rolling_file::RollingFileAppenderBase;
16use tracing_subscriber::{layer::SubscriberExt as _, reload, util::SubscriberInitExt as _, Layer, Registry};
17
18mod api;
19pub use self::api::{LoggingAPIHandler, LoggingOverrideController, LoggingOverrideWorker};
20
21mod config;
22pub use self::config::{LogLevel, LoggingConfiguration};
23
24mod layer;
25use self::layer::{build_formatting_layer, build_syslog_formatting_layer};
26
27mod syslog;
28use self::syslog::SyslogWriter;
29
30// Number of buffered lines in each non-blocking log writer.
31//
32// This directly influences the idle memory usage since each logging backend (console, file, etc) will have a bounded
33// channel that can hold this many elements, and each element is roughly 32 bytes, so 1,000 elements/lines consumes a
34// minimum of ~32KB, etc.
35const NB_LOG_WRITER_BUFFER_SIZE: usize = 4096;
36
37type OutputStack = Vec<Box<dyn Layer<Registry> + Send + Sync>>;
38
39/// A handle to the dynamic logging subsystem.
40///
41/// Held by [`BootstrapGuard`][crate::bootstrap::BootstrapGuard] for the lifetime of the application. Owns the
42/// worker guards (which flush buffered output on drop), and exposes [`reload`][Self::reload] for swapping the
43/// entire logging configuration plus [`controller`][Self::controller] for driving runtime filter changes through
44/// the override worker.
45pub struct LoggingGuard {
46    worker_guards: Vec<WorkerGuard>,
47    stack_handle: reload::Handle<OutputStack, Registry>,
48    controller: LoggingOverrideController,
49}
50
51impl LoggingGuard {
52    /// Reloads the logging subsystem from the given configuration.
53    ///
54    /// Rebuilds the output layer stack from `config` and routes the new level filter through the override worker
55    /// as the new base filter, so an active override is preserved (the new base will take effect once the override
56    /// expires). Worker guards for the previous outputs are dropped after the swap, which flushes any buffered log
57    /// lines to their original destinations.
58    ///
59    /// This is the right entry point when the entire logging configuration may have changed (for example, outputs,
60    /// format, level). For runtime base-filter changes only -- such as following a `log_level` config update --
61    /// use [`controller`][Self::controller] and call
62    /// [`update_base`][LoggingOverrideController::update_base] directly.
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if the new output layers can't be constructed (for example, the configured log file path is
67    /// inaccessible) or if the override worker is no longer running.
68    pub async fn reload(&mut self, config: LoggingConfiguration) -> Result<(), GenericError> {
69        let (new_stack, new_guards) = build_output_stack(&config)?;
70        let new_filter = config.log_level.as_env_filter();
71
72        self.stack_handle
73            .reload(new_stack)
74            .map_err(|e| generic_error!("Failed to swap logging output stack: {}", e))?;
75        self.controller.update_base(new_filter).await?;
76
77        // Drop the old worker guards _after_ the swap so any buffered lines are flushed to their original destinations
78        // before the worker threads exit.
79        let _old_guards = std::mem::replace(&mut self.worker_guards, new_guards);
80
81        Ok(())
82    }
83
84    /// Returns a logging override controller that can be used to change the default filter directives.
85    pub fn controller(&self) -> LoggingOverrideController {
86        self.controller.clone()
87    }
88}
89
90/// Initializes the logging subsystem for `tracing` with the ability to dynamically update the log filtering directives
91/// at runtime.
92///
93/// Returns a [`LoggingGuard`] which must be held until the application is about to shutdown, plus a
94/// [`LoggingOverrideWorker`] that must be added to a [`Supervisor`][saluki_core::runtime::Supervisor] to drive
95/// the dynamic override processor; the worker also asserts the privileged API routes for runtime filter control.
96/// Without the worker running, override requests are accepted but never applied.
97///
98/// # Errors
99///
100/// If the logging subsystem was already initialized, an error will be returned.
101pub(crate) async fn initialize_logging(
102    config: LoggingConfiguration,
103) -> Result<(LoggingGuard, LoggingOverrideWorker), GenericError> {
104    // Build the initial output stack from the supplied configuration. This is later swappable via
105    // `BootstrapGuard::reload_logging` once the Datadog Agent provides authoritative configuration.
106    let (output_stack, worker_guards) = build_output_stack(&config)?;
107    let (output_layer, stack_handle) = reload::Layer::new(output_stack);
108
109    // Set up our log level filtering and dynamic filter layer.
110    let (filter_layer, filter_handle) = reload::Layer::new(config.log_level.as_env_filter());
111
112    // The override worker owns the canonical base filter -- the directives the system restores to after an override
113    // expires or is reset. It seeds the base from the reload handle on startup and is updated via the controller,
114    // both by `LoggingGuard::reload` once the Agent's configuration is applied and by any other caller (for example, a
115    // runtime `log_level` watcher) wired up via [`LoggingGuard::controller`].
116    let (override_worker, controller) = LoggingOverrideWorker::new(filter_handle);
117
118    tracing_subscriber::registry()
119        .with(output_layer.with_filter(filter_layer))
120        .try_init()?;
121
122    Ok((
123        LoggingGuard {
124            worker_guards,
125            stack_handle,
126            controller,
127        },
128        override_worker,
129    ))
130}
131
132fn build_output_stack(config: &LoggingConfiguration) -> Result<(OutputStack, Vec<WorkerGuard>), GenericError> {
133    let mut layers: OutputStack = Vec::new();
134    let mut guards = Vec::new();
135
136    if config.log_to_console {
137        let (nb_stdout, guard) = writer_to_nonblocking("console", std::io::stdout());
138        guards.push(guard);
139        layers.push(build_formatting_layer(config, nb_stdout));
140    }
141
142    if !config.log_file.is_empty() {
143        let appender_builder = RollingFileAppenderBase::builder();
144        let appender = appender_builder
145            .filename(config.log_file.clone())
146            .max_filecount(config.log_file_max_rolls)
147            .condition_max_file_size(config.log_file_max_size.as_u64())
148            .build()
149            .map_err(|e| generic_error!("Failed to build log file appender: {}", e))?;
150
151        let (nb_appender, guard) = writer_to_nonblocking("file", appender);
152        guards.push(guard);
153        layers.push(build_formatting_layer(config, nb_appender));
154    }
155
156    if config.log_to_syslog {
157        let syslog_writer = SyslogWriter::from_uri(&config.syslog_uri)
158            .map_err(|e| generic_error!("Failed to build syslog log writer: {}", e))?;
159        // Keep syslog on the same lossy non-blocking path as console/file so logging never
160        // backpressures ADP.
161        let (nb_syslog, guard) = writer_to_nonblocking("syslog", syslog_writer);
162        guards.push(guard);
163        layers.push(build_syslog_formatting_layer(config, nb_syslog));
164    }
165
166    Ok((layers, guards))
167}
168
169fn writer_to_nonblocking<W>(writer_name: &'static str, writer: W) -> (NonBlocking, WorkerGuard)
170where
171    W: Write + Send + 'static,
172{
173    let thread_name = format!("log-writer-{}", writer_name);
174    NonBlockingBuilder::default()
175        .thread_name(&thread_name)
176        .buffered_lines_limit(NB_LOG_WRITER_BUFFER_SIZE)
177        .lossy(true)
178        .finish(writer)
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    const TEST_SYSLOG_URI: &str = "udp://127.0.0.1:9";
186
187    #[test]
188    fn output_stack_skips_syslog_when_disabled() {
189        let config = logging_config_without_outputs();
190
191        let (layers, guards) = build_output_stack(&config).expect("build output stack");
192
193        assert_eq!(layers.len(), 0);
194        assert_eq!(guards.len(), 0);
195    }
196
197    #[test]
198    fn output_stack_adds_syslog_layer_and_guard_when_enabled() {
199        let config = logging_config_with_syslog(TEST_SYSLOG_URI);
200
201        let (layers, guards) = build_output_stack(&config).expect("build output stack with syslog");
202
203        assert_eq!(layers.len(), 1);
204        assert_eq!(guards.len(), 1);
205    }
206
207    #[test]
208    fn output_stack_fails_when_syslog_uri_is_invalid() {
209        let config = logging_config_with_syslog("http://127.0.0.1:514");
210
211        let error = match build_output_stack(&config) {
212            Ok(_) => panic!("invalid syslog URI should fail output stack build"),
213            Err(error) => error,
214        };
215
216        let error = error.to_string();
217        assert!(error.contains("Failed to build syslog log writer"));
218        assert!(error.contains("Unsupported syslog URI scheme 'http'"));
219    }
220
221    #[tokio::test]
222    async fn reload_can_enable_change_disable_syslog_and_preserves_previous_stack_on_invalid_config() {
223        use saluki_core::runtime::Supervisor;
224        use tokio::sync::oneshot;
225
226        let config = logging_config_without_outputs();
227        let (output_stack, worker_guards) = build_output_stack(&config).expect("build initial output stack");
228        let (output_layer, stack_handle) = reload::Layer::new(output_stack);
229        let (filter_layer, filter_handle) = reload::Layer::new(config.log_level.as_env_filter());
230        let (override_worker, controller) = LoggingOverrideWorker::new(filter_handle);
231        let mut guard = LoggingGuard {
232            worker_guards,
233            stack_handle,
234            controller,
235        };
236        let _keep_layers_alive = (output_layer, filter_layer);
237
238        // Run the override worker inside a Supervisor so it has the dataspace context required for
239        // its route assertion. The supervisor's shutdown signal drives the worker to exit cleanly.
240        let mut sup = Supervisor::new("test-logging-override").expect("create supervisor");
241        sup.add_worker(override_worker);
242        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
243        let sup_handle = tokio::spawn(async move { sup.run_with_shutdown(shutdown_rx).await });
244
245        guard
246            .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
247            .await
248            .expect("reload should enable syslog");
249        assert_eq!(guard.worker_guards.len(), 1);
250
251        guard
252            .reload(logging_config_with_syslog("udp://127.0.0.1:10"))
253            .await
254            .expect("reload should change syslog URI");
255        assert_eq!(guard.worker_guards.len(), 1);
256
257        guard
258            .reload(logging_config_without_outputs())
259            .await
260            .expect("reload should disable syslog");
261        assert_eq!(guard.worker_guards.len(), 0);
262
263        guard
264            .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
265            .await
266            .expect("reload should re-enable syslog");
267        assert_eq!(guard.worker_guards.len(), 1);
268
269        let error = guard
270            .reload(logging_config_with_syslog("http://127.0.0.1:514"))
271            .await
272            .expect_err("invalid syslog URI should fail reload");
273        assert!(error.to_string().contains("Failed to build syslog log writer"));
274        assert_eq!(guard.worker_guards.len(), 1);
275
276        // Trigger supervisor shutdown so the worker exits via its shutdown branch rather than via
277        // the channel-close path (which would prompt a restart attempt by the supervisor).
278        shutdown_tx.send(()).expect("send shutdown");
279        sup_handle
280            .await
281            .expect("supervisor task joins")
282            .expect("supervisor should exit cleanly");
283        drop(guard);
284    }
285
286    fn logging_config_without_outputs() -> LoggingConfiguration {
287        let mut config = LoggingConfiguration::simple();
288        config.log_to_console = false;
289        config.log_file.clear();
290        config.log_to_syslog = false;
291        config.syslog_uri.clear();
292        config
293    }
294
295    fn logging_config_with_syslog(uri: &str) -> LoggingConfiguration {
296        let mut config = logging_config_without_outputs();
297        config.log_to_syslog = true;
298        config.syslog_uri = uri.to_string();
299        config
300    }
301}