saluki_app/logging/
mod.rs1use std::io::Write;
12
13use saluki_error::{generic_error, GenericError};
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15use tracing_rolling_file::RollingFileAppenderBase;
16use tracing_subscriber::{layer::SubscriberExt as _, reload, util::SubscriberInitExt as _, Layer, Registry};
17
18mod api;
19pub use self::api::{LoggingAPIHandler, LoggingOverrideController, LoggingOverrideWorker};
20
21mod config;
22pub use self::config::{LogLevel, LoggingConfiguration};
23
24mod layer;
25use self::layer::{build_formatting_layer, build_syslog_formatting_layer};
26
27mod syslog;
28use self::syslog::SyslogWriter;
29
30const NB_LOG_WRITER_BUFFER_SIZE: usize = 4096;
36
37type OutputStack = Vec<Box<dyn Layer<Registry> + Send + Sync>>;
38
39pub struct LoggingGuard {
46 worker_guards: Vec<WorkerGuard>,
47 stack_handle: reload::Handle<OutputStack, Registry>,
48 controller: LoggingOverrideController,
49}
50
51impl LoggingGuard {
52 pub async fn reload(&mut self, config: LoggingConfiguration) -> Result<(), GenericError> {
69 let (new_stack, new_guards) = build_output_stack(&config)?;
70 let new_filter = config.log_level.as_env_filter();
71
72 self.stack_handle
73 .reload(new_stack)
74 .map_err(|e| generic_error!("Failed to swap logging output stack: {}", e))?;
75 self.controller.update_base(new_filter).await?;
76
77 let _old_guards = std::mem::replace(&mut self.worker_guards, new_guards);
80
81 Ok(())
82 }
83
84 pub fn controller(&self) -> LoggingOverrideController {
86 self.controller.clone()
87 }
88}
89
90pub(crate) async fn initialize_logging(
102 config: LoggingConfiguration,
103) -> Result<(LoggingGuard, LoggingOverrideWorker), GenericError> {
104 let (output_stack, worker_guards) = build_output_stack(&config)?;
107 let (output_layer, stack_handle) = reload::Layer::new(output_stack);
108
109 let (filter_layer, filter_handle) = reload::Layer::new(config.log_level.as_env_filter());
111
112 let (override_worker, controller) = LoggingOverrideWorker::new(filter_handle);
117
118 tracing_subscriber::registry()
119 .with(output_layer.with_filter(filter_layer))
120 .try_init()?;
121
122 Ok((
123 LoggingGuard {
124 worker_guards,
125 stack_handle,
126 controller,
127 },
128 override_worker,
129 ))
130}
131
132fn build_output_stack(config: &LoggingConfiguration) -> Result<(OutputStack, Vec<WorkerGuard>), GenericError> {
133 let mut layers: OutputStack = Vec::new();
134 let mut guards = Vec::new();
135
136 if config.log_to_console {
137 let (nb_stdout, guard) = writer_to_nonblocking("console", std::io::stdout());
138 guards.push(guard);
139 layers.push(build_formatting_layer(config, nb_stdout));
140 }
141
142 if !config.log_file.is_empty() {
143 let appender_builder = RollingFileAppenderBase::builder();
144 let appender = appender_builder
145 .filename(config.log_file.clone())
146 .max_filecount(config.log_file_max_rolls)
147 .condition_max_file_size(config.log_file_max_size.as_u64())
148 .build()
149 .map_err(|e| generic_error!("Failed to build log file appender: {}", e))?;
150
151 let (nb_appender, guard) = writer_to_nonblocking("file", appender);
152 guards.push(guard);
153 layers.push(build_formatting_layer(config, nb_appender));
154 }
155
156 if config.log_to_syslog {
157 let syslog_writer = SyslogWriter::from_uri(&config.syslog_uri)
158 .map_err(|e| generic_error!("Failed to build syslog log writer: {}", e))?;
159 let (nb_syslog, guard) = writer_to_nonblocking("syslog", syslog_writer);
162 guards.push(guard);
163 layers.push(build_syslog_formatting_layer(config, nb_syslog));
164 }
165
166 Ok((layers, guards))
167}
168
169fn writer_to_nonblocking<W>(writer_name: &'static str, writer: W) -> (NonBlocking, WorkerGuard)
170where
171 W: Write + Send + 'static,
172{
173 let thread_name = format!("log-writer-{}", writer_name);
174 NonBlockingBuilder::default()
175 .thread_name(&thread_name)
176 .buffered_lines_limit(NB_LOG_WRITER_BUFFER_SIZE)
177 .lossy(true)
178 .finish(writer)
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 const TEST_SYSLOG_URI: &str = "udp://127.0.0.1:9";
186
187 #[test]
188 fn output_stack_skips_syslog_when_disabled() {
189 let config = logging_config_without_outputs();
190
191 let (layers, guards) = build_output_stack(&config).expect("build output stack");
192
193 assert_eq!(layers.len(), 0);
194 assert_eq!(guards.len(), 0);
195 }
196
197 #[test]
198 fn output_stack_adds_syslog_layer_and_guard_when_enabled() {
199 let config = logging_config_with_syslog(TEST_SYSLOG_URI);
200
201 let (layers, guards) = build_output_stack(&config).expect("build output stack with syslog");
202
203 assert_eq!(layers.len(), 1);
204 assert_eq!(guards.len(), 1);
205 }
206
207 #[test]
208 fn output_stack_fails_when_syslog_uri_is_invalid() {
209 let config = logging_config_with_syslog("http://127.0.0.1:514");
210
211 let error = match build_output_stack(&config) {
212 Ok(_) => panic!("invalid syslog URI should fail output stack build"),
213 Err(error) => error,
214 };
215
216 let error = error.to_string();
217 assert!(error.contains("Failed to build syslog log writer"));
218 assert!(error.contains("Unsupported syslog URI scheme 'http'"));
219 }
220
221 #[tokio::test]
222 async fn reload_can_enable_change_disable_syslog_and_preserves_previous_stack_on_invalid_config() {
223 use saluki_core::runtime::Supervisor;
224 use tokio::sync::oneshot;
225
226 let config = logging_config_without_outputs();
227 let (output_stack, worker_guards) = build_output_stack(&config).expect("build initial output stack");
228 let (output_layer, stack_handle) = reload::Layer::new(output_stack);
229 let (filter_layer, filter_handle) = reload::Layer::new(config.log_level.as_env_filter());
230 let (override_worker, controller) = LoggingOverrideWorker::new(filter_handle);
231 let mut guard = LoggingGuard {
232 worker_guards,
233 stack_handle,
234 controller,
235 };
236 let _keep_layers_alive = (output_layer, filter_layer);
237
238 let mut sup = Supervisor::new("test-logging-override").expect("create supervisor");
241 sup.add_worker(override_worker);
242 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
243 let sup_handle = tokio::spawn(async move { sup.run_with_shutdown(shutdown_rx).await });
244
245 guard
246 .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
247 .await
248 .expect("reload should enable syslog");
249 assert_eq!(guard.worker_guards.len(), 1);
250
251 guard
252 .reload(logging_config_with_syslog("udp://127.0.0.1:10"))
253 .await
254 .expect("reload should change syslog URI");
255 assert_eq!(guard.worker_guards.len(), 1);
256
257 guard
258 .reload(logging_config_without_outputs())
259 .await
260 .expect("reload should disable syslog");
261 assert_eq!(guard.worker_guards.len(), 0);
262
263 guard
264 .reload(logging_config_with_syslog(TEST_SYSLOG_URI))
265 .await
266 .expect("reload should re-enable syslog");
267 assert_eq!(guard.worker_guards.len(), 1);
268
269 let error = guard
270 .reload(logging_config_with_syslog("http://127.0.0.1:514"))
271 .await
272 .expect_err("invalid syslog URI should fail reload");
273 assert!(error.to_string().contains("Failed to build syslog log writer"));
274 assert_eq!(guard.worker_guards.len(), 1);
275
276 shutdown_tx.send(()).expect("send shutdown");
279 sup_handle
280 .await
281 .expect("supervisor task joins")
282 .expect("supervisor should exit cleanly");
283 drop(guard);
284 }
285
286 fn logging_config_without_outputs() -> LoggingConfiguration {
287 let mut config = LoggingConfiguration::simple();
288 config.log_to_console = false;
289 config.log_file.clear();
290 config.log_to_syslog = false;
291 config.syslog_uri.clear();
292 config
293 }
294
295 fn logging_config_with_syslog(uri: &str) -> LoggingConfiguration {
296 let mut config = logging_config_without_outputs();
297 config.log_to_syslog = true;
298 config.syslog_uri = uri.to_string();
299 config
300 }
301}