saluki_app/logging/
mod.rs1use std::io::Write;
12
13use saluki_error::{generic_error, GenericError};
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15use tracing_rolling_file::RollingFileAppenderBase;
16use tracing_subscriber::{layer::SubscriberExt as _, reload, util::SubscriberInitExt as _, Layer, Registry};
17
18mod api;
19use self::api::set_logging_api_handler;
20pub use self::api::{acquire_logging_api_handler, LoggingAPIHandler, LoggingOverrideController, LoggingOverrideWorker};
21
22mod config;
23pub use self::config::{LogLevel, LoggingConfiguration};
24
25mod layer;
26use self::layer::{build_formatting_layer, build_syslog_formatting_layer};
27
28mod syslog;
29use self::syslog::SyslogWriter;
30
31const NB_LOG_WRITER_BUFFER_SIZE: usize = 4096;
37
38type OutputStack = Vec<Box<dyn Layer<Registry> + Send + Sync>>;
39
40pub struct LoggingGuard {
47 worker_guards: Vec<WorkerGuard>,
48 stack_handle: reload::Handle<OutputStack, Registry>,
49 controller: LoggingOverrideController,
50}
51
52impl LoggingGuard {
53 pub async fn reload(&mut self, config: LoggingConfiguration) -> Result<(), GenericError> {
70 let (new_stack, new_guards) = build_output_stack(&config)?;
71 let new_filter = config.log_level.as_env_filter();
72
73 self.stack_handle
74 .reload(new_stack)
75 .map_err(|e| generic_error!("Failed to swap logging output stack: {}", e))?;
76 self.controller.update_base(new_filter).await?;
77
78 let _old_guards = std::mem::replace(&mut self.worker_guards, new_guards);
81
82 Ok(())
83 }
84
85 pub fn controller(&self) -> LoggingOverrideController {
87 self.controller.clone()
88 }
89}
90
91pub(crate) async fn initialize_logging(
105 config: LoggingConfiguration,
106) -> Result<(LoggingGuard, LoggingOverrideWorker), GenericError> {
107 let (output_stack, worker_guards) = build_output_stack(&config)?;
110 let (output_layer, stack_handle) = reload::Layer::new(output_stack);
111
112 let level_filter = config.log_level.as_env_filter();
114 let (filter_layer, filter_handle) = reload::Layer::new(level_filter.clone());
115
116 let (override_worker, controller) = LoggingOverrideWorker::new(level_filter, filter_handle);
121 set_logging_api_handler(LoggingAPIHandler::new(controller.clone()));
122
123 tracing_subscriber::registry()
124 .with(output_layer.with_filter(filter_layer))
125 .try_init()?;
126
127 Ok((
128 LoggingGuard {
129 worker_guards,
130 stack_handle,
131 controller,
132 },
133 override_worker,
134 ))
135}
136
137fn build_output_stack(config: &LoggingConfiguration) -> Result<(OutputStack, Vec<WorkerGuard>), GenericError> {
138 let mut layers: OutputStack = Vec::new();
139 let mut guards = Vec::new();
140
141 if config.log_to_console {
142 let (nb_stdout, guard) = writer_to_nonblocking("console", std::io::stdout());
143 guards.push(guard);
144 layers.push(build_formatting_layer(config, nb_stdout));
145 }
146
147 if !config.log_file.is_empty() {
148 let appender_builder = RollingFileAppenderBase::builder();
149 let appender = appender_builder
150 .filename(config.log_file.clone())
151 .max_filecount(config.log_file_max_rolls)
152 .condition_max_file_size(config.log_file_max_size.as_u64())
153 .build()
154 .map_err(|e| generic_error!("Failed to build log file appender: {}", e))?;
155
156 let (nb_appender, guard) = writer_to_nonblocking("file", appender);
157 guards.push(guard);
158 layers.push(build_formatting_layer(config, nb_appender));
159 }
160
161 if config.log_to_syslog {
162 let syslog_writer = SyslogWriter::from_uri(&config.syslog_uri)
163 .map_err(|e| generic_error!("Failed to build syslog log writer: {}", e))?;
164 let (nb_syslog, guard) = writer_to_nonblocking("syslog", syslog_writer);
167 guards.push(guard);
168 layers.push(build_syslog_formatting_layer(config, nb_syslog));
169 }
170
171 Ok((layers, guards))
172}
173
174fn writer_to_nonblocking<W>(writer_name: &'static str, writer: W) -> (NonBlocking, WorkerGuard)
175where
176 W: Write + Send + 'static,
177{
178 let thread_name = format!("log-writer-{}", writer_name);
179 NonBlockingBuilder::default()
180 .thread_name(&thread_name)
181 .buffered_lines_limit(NB_LOG_WRITER_BUFFER_SIZE)
182 .lossy(true)
183 .finish(writer)
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 const TEST_SYSLOG_URI: &str = "udp://127.0.0.1:9";
191
192 #[test]
193 fn output_stack_skips_syslog_when_disabled() {
194 let config = logging_config_without_outputs();
195
196 let (layers, guards) = build_output_stack(&config).expect("build output stack");
197
198 assert_eq!(layers.len(), 0);
199 assert_eq!(guards.len(), 0);
200 }
201
202 #[test]
203 fn output_stack_adds_syslog_layer_and_guard_when_enabled() {
204 let config = logging_config_with_syslog(TEST_SYSLOG_URI);
205
206 let (layers, guards) = build_output_stack(&config).expect("build output stack with syslog");
207
208 assert_eq!(layers.len(), 1);
209 assert_eq!(guards.len(), 1);
210 }
211
212 #[test]
213 fn output_stack_fails_when_syslog_uri_is_invalid() {
214 let config = logging_config_with_syslog("http://127.0.0.1:514");
215
216 let error = match build_output_stack(&config) {
217 Ok(_) => panic!("invalid syslog URI should fail output stack build"),
218 Err(error) => error,
219 };
220
221 let error = error.to_string();
222 assert!(error.contains("Failed to build syslog log writer"));
223 assert!(error.contains("Unsupported syslog URI scheme 'http'"));
224 }
225
226 #[tokio::test]
227 async fn reload_can_enable_change_disable_syslog_and_preserves_previous_stack_on_invalid_config() {
228 use saluki_core::runtime::{ProcessShutdown, Supervisable as _};
229
230 let config = logging_config_without_outputs();
231 let (output_stack, worker_guards) = build_output_stack(&config).expect("build initial output stack");
232 let (output_layer, stack_handle) = reload::Layer::new(output_stack);
233 let level_filter = config.log_level.as_env_filter();
234 let (filter_layer, filter_handle) = reload::Layer::new(level_filter.clone());
235 let (override_worker, controller) = LoggingOverrideWorker::new(level_filter, filter_handle);
236 let mut guard = LoggingGuard {
237 worker_guards,
238 stack_handle,
239 controller,
240 };
241 let _keep_layers_alive = (output_layer, filter_layer);
242
243 let worker_fut = override_worker
245 .initialize(ProcessShutdown::noop())
246 .await
247 .expect("worker init");
248 let worker_handle = tokio::spawn(worker_fut);
249
250 guard
251 .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
252 .await
253 .expect("reload should enable syslog");
254 assert_eq!(guard.worker_guards.len(), 1);
255
256 guard
257 .reload(logging_config_with_syslog("udp://127.0.0.1:10"))
258 .await
259 .expect("reload should change syslog URI");
260 assert_eq!(guard.worker_guards.len(), 1);
261
262 guard
263 .reload(logging_config_without_outputs())
264 .await
265 .expect("reload should disable syslog");
266 assert_eq!(guard.worker_guards.len(), 0);
267
268 guard
269 .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
270 .await
271 .expect("reload should re-enable syslog");
272 assert_eq!(guard.worker_guards.len(), 1);
273
274 let error = guard
275 .reload(logging_config_with_syslog("http://127.0.0.1:514"))
276 .await
277 .expect_err("invalid syslog URI should fail reload");
278 assert!(error.to_string().contains("Failed to build syslog log writer"));
279 assert_eq!(guard.worker_guards.len(), 1);
280
281 drop(guard);
283 worker_handle
284 .await
285 .expect("override worker should exit cleanly")
286 .expect("override worker should not error");
287 }
288
289 fn logging_config_without_outputs() -> LoggingConfiguration {
290 let mut config = LoggingConfiguration::simple();
291 config.log_to_console = false;
292 config.log_file.clear();
293 config.log_to_syslog = false;
294 config.syslog_uri.clear();
295 config
296 }
297
298 fn logging_config_with_syslog(uri: &str) -> LoggingConfiguration {
299 let mut config = logging_config_without_outputs();
300 config.log_to_syslog = true;
301 config.syslog_uri = uri.to_string();
302 config
303 }
304}