airlock/unix.rs
1//! Unix process driver for non-containerized integration tests.
2//!
3//! This module mirrors the relevant surface of the Docker [`Driver`][crate::driver::Driver] but
4//! spawns a local binary instead of a container. It exists so that integration tests can run on
5//! Unix hosts where ADP is exercised as a real host process rather than inside a container. The
6//! code path is portable across POSIX hosts (Linux + macOS); only macOS is exercised today, but
7//! the same module is used unchanged when we opt other Unix hosts into the suite.
8//!
9//! Only the small subset of the Docker driver surface needed by the panoramic Unix runner is
10//! implemented: spawn, log capture, exit watching, and cleanup.
11
12#[cfg(unix)]
13use std::time::Duration;
14use std::{
15 collections::HashMap,
16 path::PathBuf,
17 process::Stdio,
18 sync::{Arc, OnceLock},
19};
20
21use saluki_error::{generic_error, ErrorContext as _, GenericError};
22use tokio::{
23 io::{AsyncBufReadExt as _, AsyncRead, BufReader},
24 process::Command,
25 sync::Mutex,
26 task::JoinHandle,
27};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, warn};
30
31/// Shared cell that receives the exit code of a spawned [`UnixProcess`].
32///
33/// The cell is populated by the background exit watcher when the child exits on its own, or by
34/// [`UnixProcess::cleanup`] when the test tears down. Consumers (for example, the
35/// `adp_exits_with` assertion in panoramic) read the cell after the exit token fires.
36///
37/// The inner `Option<i32>` is `None` if the process was terminated by signal rather than exiting
38/// normally with a status code.
39pub type ExitCodeCell = Arc<OnceLock<Option<i32>>>;
40
41/// Configuration for a Unix process to spawn.
42#[derive(Clone)]
43pub struct UnixProcessConfig {
44 /// Display name used for logs and reporting.
45 pub name: String,
46 /// Absolute path to the binary to execute.
47 pub binary_path: PathBuf,
48 /// Arguments passed to the binary.
49 pub args: Vec<String>,
50 /// Environment variables to set for the process.
51 pub env: HashMap<String, String>,
52}
53
54impl UnixProcessConfig {
55 /// Creates a new configuration with the given display name and binary path.
56 pub fn new(name: impl Into<String>, binary_path: impl Into<PathBuf>) -> Self {
57 Self {
58 name: name.into(),
59 binary_path: binary_path.into(),
60 args: Vec::new(),
61 env: HashMap::new(),
62 }
63 }
64
65 /// Sets the arguments for the process.
66 pub fn with_args(mut self, args: Vec<String>) -> Self {
67 self.args = args;
68 self
69 }
70
71 /// Sets all environment variables for the process at once.
72 pub fn with_env_map(mut self, env: HashMap<String, String>) -> Self {
73 self.env = env;
74 self
75 }
76}
77
78/// A trait-object-friendly sink for log lines captured from a Unix process.
79///
80/// This is intentionally minimal so consumers can implement it on their own log buffer type
81/// without depending on `airlock`.
82pub trait LogSink: Send + Sync {
83 /// Pushes a captured log line. `is_stderr` is `true` for lines that came from the
84 /// process's stderr stream, `false` for stdout.
85 fn push_line(&mut self, line: String, is_stderr: bool);
86}
87
88/// A spawned Unix process and its supporting tasks.
89///
90/// `UnixProcess` owns the child process plus background tasks that pump stdout/stderr lines
91/// into a shared sink and observe the child's exit. The provided exit token is cancelled when
92/// the child process exits on its own (observed by the background watcher) or when
93/// [`cleanup`][Self::cleanup] is called. The exit code is recorded in the shared
94/// [`ExitCodeCell`] returned by [`exit_code_cell`][Self::exit_code_cell].
95///
96/// The spawned process is always made the leader of a new process group, so
97/// [`cleanup`][Self::cleanup] can signal the entire group (parent plus any forked helpers).
98/// This matters for binaries like the Datadog Core Agent that spawn `trace-agent` /
99/// `process-agent` which would otherwise orphan onto the init/launchd system supervisor when
100/// only the parent is killed.
101pub struct UnixProcess {
102 name: String,
103 /// PGID of the spawned process. We made the child the group leader at spawn time, so this
104 /// equals the child's PID. `None` only if spawn failed to return a PID (very rare).
105 #[cfg(unix)]
106 process_group: Option<i32>,
107 exit_code: ExitCodeCell,
108 log_tasks: Vec<JoinHandle<()>>,
109 exit_task: Option<JoinHandle<()>>,
110}
111
112impl UnixProcess {
113 /// Spawns the process described by `config`. The provided `log_sink` receives each line of
114 /// captured stdout/stderr; the provided `exit_token` is cancelled when the process exits.
115 pub async fn spawn(
116 config: UnixProcessConfig, log_sink: Arc<Mutex<dyn LogSink>>, exit_token: CancellationToken,
117 ) -> Result<Self, GenericError> {
118 if !config.binary_path.exists() {
119 return Err(generic_error!(
120 "Binary not found at expected path: {}",
121 config.binary_path.display()
122 ));
123 }
124
125 let mut cmd = Command::new(&config.binary_path);
126 cmd.args(&config.args)
127 .envs(&config.env)
128 .stdout(Stdio::piped())
129 .stderr(Stdio::piped())
130 .kill_on_drop(true);
131 // Always place the spawned process in a new process group so cleanup can signal the
132 // entire group (parent + any forked helpers) without leaking orphans.
133 #[cfg(unix)]
134 cmd.process_group(0);
135
136 let mut child = cmd
137 .spawn()
138 .with_error_context(|| format!("Failed to spawn '{}'.", config.binary_path.display()))?;
139
140 #[cfg(unix)]
141 let process_group = child.id().map(|pid| pid as i32);
142
143 let stdout = child
144 .stdout
145 .take()
146 .ok_or_else(|| generic_error!("Failed to capture stdout."))?;
147 let stderr = child
148 .stderr
149 .take()
150 .ok_or_else(|| generic_error!("Failed to capture stderr."))?;
151
152 let stdout_task = spawn_log_pump(stdout, log_sink.clone(), false);
153 let stderr_task = spawn_log_pump(stderr, log_sink, true);
154
155 // Exit watcher: moves the child into the task, calls `wait()`, records the exit code,
156 // and fires the exit token so blocked assertions (process_stable_for / adp_exits_with)
157 // unblock immediately rather than waiting for the test's own cleanup phase.
158 let exit_code: ExitCodeCell = Arc::new(OnceLock::new());
159 let exit_code_for_watcher = exit_code.clone();
160 let name_for_watcher = config.name.clone();
161 let exit_task = tokio::spawn(async move {
162 match child.wait().await {
163 Ok(status) => {
164 let code = status.code();
165 debug!(name = %name_for_watcher, ?code, "Unix process exited.");
166 let _ = exit_code_for_watcher.set(code);
167 }
168 Err(e) => {
169 warn!(name = %name_for_watcher, error = %e, "Failed to wait on Unix process; treating as exited.");
170 let _ = exit_code_for_watcher.set(None);
171 }
172 }
173 exit_token.cancel();
174 });
175
176 Ok(Self {
177 name: config.name,
178 #[cfg(unix)]
179 process_group,
180 exit_code,
181 log_tasks: vec![stdout_task, stderr_task],
182 exit_task: Some(exit_task),
183 })
184 }
185
186 /// Returns the display name of the process.
187 pub fn name(&self) -> &str {
188 &self.name
189 }
190
191 /// Returns a clone of the shared exit-code cell. The cell is populated once the process
192 /// exits (either on its own or via cleanup). Consumers should wait on the exit token they
193 /// passed to [`spawn`][Self::spawn] before reading.
194 pub fn exit_code_cell(&self) -> ExitCodeCell {
195 self.exit_code.clone()
196 }
197
198 /// Kills the spawned process group, joins background tasks, and cancels the exit token.
199 ///
200 /// Sends SIGTERM to the whole group, waits a short grace period, then sends SIGKILL to
201 /// guarantee nothing is left behind. The grace period gives well-behaved descendants
202 /// (for example, the Core Agent's `trace-agent` / `process-agent` helpers) a chance to
203 /// shut down cleanly before we hard-kill them.
204 pub async fn cleanup(mut self) {
205 #[cfg(unix)]
206 if let Some(pgid) = self.process_group {
207 // SAFETY: killpg with a valid pgid is a safe syscall; we ignore the return value.
208 unsafe {
209 libc::killpg(pgid, libc::SIGTERM);
210 }
211 tokio::time::sleep(Duration::from_millis(500)).await;
212 unsafe {
213 libc::killpg(pgid, libc::SIGKILL);
214 }
215 }
216
217 // The exit watcher will have observed the kill, set the exit code, and fired the exit
218 // token. Join it (and the log pumps) so we don't leak tasks.
219 if let Some(handle) = self.exit_task.take() {
220 let _ = handle.await;
221 }
222 for handle in self.log_tasks.drain(..) {
223 let _ = handle.await;
224 }
225 }
226}
227
228impl Drop for UnixProcess {
229 fn drop(&mut self) {
230 if self.exit_task.is_some() {
231 warn!(
232 name = %self.name,
233 "UnixProcess dropped without explicit cleanup; child may have been killed via kill_on_drop."
234 );
235 }
236 }
237}
238
239fn spawn_log_pump<R>(reader: R, sink: Arc<Mutex<dyn LogSink>>, is_stderr: bool) -> JoinHandle<()>
240where
241 R: AsyncRead + Unpin + Send + 'static,
242{
243 let mut lines = BufReader::new(reader).lines();
244 tokio::spawn(async move {
245 loop {
246 match lines.next_line().await {
247 Ok(Some(line)) => {
248 let mut sink = sink.lock().await;
249 sink.push_line(line, is_stderr);
250 }
251 Ok(None) => break,
252 Err(e) => {
253 debug!(error = %e, "Log pump read error; stopping.");
254 break;
255 }
256 }
257 }
258 })
259}