Skip to main content

airlock/
unix.rs

1//! Unix process driver for non-containerized integration tests.
2//!
3//! This module mirrors the relevant surface of the Docker [`Driver`][crate::driver::Driver] but
4//! spawns a local binary instead of a container. It exists so that integration tests can run on
5//! Unix hosts where ADP is exercised as a real host process rather than inside a container. The
6//! code path is portable across POSIX hosts (Linux + macOS); only macOS is exercised today, but
7//! the same module is used unchanged when we opt other Unix hosts into the suite.
8//!
9//! Only the small subset of the Docker driver surface needed by the panoramic Unix runner is
10//! implemented: spawn, log capture, exit watching, and cleanup.
11
12#[cfg(unix)]
13use std::time::Duration;
14use std::{
15    collections::HashMap,
16    path::PathBuf,
17    process::Stdio,
18    sync::{Arc, OnceLock},
19};
20
21use saluki_error::{generic_error, ErrorContext as _, GenericError};
22use tokio::{
23    io::{AsyncBufReadExt as _, AsyncRead, BufReader},
24    process::Command,
25    sync::Mutex,
26    task::JoinHandle,
27};
28use tokio_util::sync::CancellationToken;
29use tracing::{debug, warn};
30
31/// Shared cell that receives the exit code of a spawned [`UnixProcess`].
32///
33/// The cell is populated by the background exit watcher when the child exits on its own, or by
34/// [`UnixProcess::cleanup`] when the test tears down. Consumers (for example, the
35/// `adp_exits_with` assertion in panoramic) read the cell after the exit token fires.
36///
37/// The inner `Option<i32>` is `None` if the process was terminated by signal rather than exiting
38/// normally with a status code.
39pub type ExitCodeCell = Arc<OnceLock<Option<i32>>>;
40
41/// Configuration for a Unix process to spawn.
42#[derive(Clone)]
43pub struct UnixProcessConfig {
44    /// Display name used for logs and reporting.
45    pub name: String,
46    /// Absolute path to the binary to execute.
47    pub binary_path: PathBuf,
48    /// Arguments passed to the binary.
49    pub args: Vec<String>,
50    /// Environment variables to set for the process.
51    pub env: HashMap<String, String>,
52}
53
54impl UnixProcessConfig {
55    /// Creates a new configuration with the given display name and binary path.
56    pub fn new(name: impl Into<String>, binary_path: impl Into<PathBuf>) -> Self {
57        Self {
58            name: name.into(),
59            binary_path: binary_path.into(),
60            args: Vec::new(),
61            env: HashMap::new(),
62        }
63    }
64
65    /// Sets the arguments for the process.
66    pub fn with_args(mut self, args: Vec<String>) -> Self {
67        self.args = args;
68        self
69    }
70
71    /// Sets all environment variables for the process at once.
72    pub fn with_env_map(mut self, env: HashMap<String, String>) -> Self {
73        self.env = env;
74        self
75    }
76}
77
78/// A trait-object-friendly sink for log lines captured from a Unix process.
79///
80/// This is intentionally minimal so consumers can implement it on their own log buffer type
81/// without depending on `airlock`.
82pub trait LogSink: Send + Sync {
83    /// Pushes a captured log line. `is_stderr` is `true` for lines that came from the
84    /// process's stderr stream, `false` for stdout.
85    fn push_line(&mut self, line: String, is_stderr: bool);
86}
87
88/// A spawned Unix process and its supporting tasks.
89///
90/// `UnixProcess` owns the child process plus background tasks that pump stdout/stderr lines
91/// into a shared sink and observe the child's exit. The provided exit token is cancelled when
92/// the child process exits on its own (observed by the background watcher) or when
93/// [`cleanup`][Self::cleanup] is called. The exit code is recorded in the shared
94/// [`ExitCodeCell`] returned by [`exit_code_cell`][Self::exit_code_cell].
95///
96/// The spawned process is always made the leader of a new process group, so
97/// [`cleanup`][Self::cleanup] can signal the entire group (parent plus any forked helpers).
98/// This matters for binaries like the Datadog Core Agent that spawn `trace-agent` /
99/// `process-agent` which would otherwise orphan onto the init/launchd system supervisor when
100/// only the parent is killed.
101pub struct UnixProcess {
102    name: String,
103    /// PGID of the spawned process. We made the child the group leader at spawn time, so this
104    /// equals the child's PID. `None` only if spawn failed to return a PID (very rare).
105    #[cfg(unix)]
106    process_group: Option<i32>,
107    exit_code: ExitCodeCell,
108    log_tasks: Vec<JoinHandle<()>>,
109    exit_task: Option<JoinHandle<()>>,
110}
111
112impl UnixProcess {
113    /// Spawns the process described by `config`. The provided `log_sink` receives each line of
114    /// captured stdout/stderr; the provided `exit_token` is cancelled when the process exits.
115    pub async fn spawn(
116        config: UnixProcessConfig, log_sink: Arc<Mutex<dyn LogSink>>, exit_token: CancellationToken,
117    ) -> Result<Self, GenericError> {
118        if !config.binary_path.exists() {
119            return Err(generic_error!(
120                "Binary not found at expected path: {}",
121                config.binary_path.display()
122            ));
123        }
124
125        let mut cmd = Command::new(&config.binary_path);
126        cmd.args(&config.args)
127            .envs(&config.env)
128            .stdout(Stdio::piped())
129            .stderr(Stdio::piped())
130            .kill_on_drop(true);
131        // Always place the spawned process in a new process group so cleanup can signal the
132        // entire group (parent + any forked helpers) without leaking orphans.
133        #[cfg(unix)]
134        cmd.process_group(0);
135
136        let mut child = cmd
137            .spawn()
138            .with_error_context(|| format!("Failed to spawn '{}'.", config.binary_path.display()))?;
139
140        #[cfg(unix)]
141        let process_group = child.id().map(|pid| pid as i32);
142
143        let stdout = child
144            .stdout
145            .take()
146            .ok_or_else(|| generic_error!("Failed to capture stdout."))?;
147        let stderr = child
148            .stderr
149            .take()
150            .ok_or_else(|| generic_error!("Failed to capture stderr."))?;
151
152        let stdout_task = spawn_log_pump(stdout, log_sink.clone(), false);
153        let stderr_task = spawn_log_pump(stderr, log_sink, true);
154
155        // Exit watcher: moves the child into the task, calls `wait()`, records the exit code,
156        // and fires the exit token so blocked assertions (process_stable_for / adp_exits_with)
157        // unblock immediately rather than waiting for the test's own cleanup phase.
158        let exit_code: ExitCodeCell = Arc::new(OnceLock::new());
159        let exit_code_for_watcher = exit_code.clone();
160        let name_for_watcher = config.name.clone();
161        let exit_task = tokio::spawn(async move {
162            match child.wait().await {
163                Ok(status) => {
164                    let code = status.code();
165                    debug!(name = %name_for_watcher, ?code, "Unix process exited.");
166                    let _ = exit_code_for_watcher.set(code);
167                }
168                Err(e) => {
169                    warn!(name = %name_for_watcher, error = %e, "Failed to wait on Unix process; treating as exited.");
170                    let _ = exit_code_for_watcher.set(None);
171                }
172            }
173            exit_token.cancel();
174        });
175
176        Ok(Self {
177            name: config.name,
178            #[cfg(unix)]
179            process_group,
180            exit_code,
181            log_tasks: vec![stdout_task, stderr_task],
182            exit_task: Some(exit_task),
183        })
184    }
185
186    /// Returns the display name of the process.
187    pub fn name(&self) -> &str {
188        &self.name
189    }
190
191    /// Returns a clone of the shared exit-code cell. The cell is populated once the process
192    /// exits (either on its own or via cleanup). Consumers should wait on the exit token they
193    /// passed to [`spawn`][Self::spawn] before reading.
194    pub fn exit_code_cell(&self) -> ExitCodeCell {
195        self.exit_code.clone()
196    }
197
198    /// Kills the spawned process group, joins background tasks, and cancels the exit token.
199    ///
200    /// Sends SIGTERM to the whole group, waits a short grace period, then sends SIGKILL to
201    /// guarantee nothing is left behind. The grace period gives well-behaved descendants
202    /// (for example, the Core Agent's `trace-agent` / `process-agent` helpers) a chance to
203    /// shut down cleanly before we hard-kill them.
204    pub async fn cleanup(mut self) {
205        #[cfg(unix)]
206        if let Some(pgid) = self.process_group {
207            // SAFETY: killpg with a valid pgid is a safe syscall; we ignore the return value.
208            unsafe {
209                libc::killpg(pgid, libc::SIGTERM);
210            }
211            tokio::time::sleep(Duration::from_millis(500)).await;
212            unsafe {
213                libc::killpg(pgid, libc::SIGKILL);
214            }
215        }
216
217        // The exit watcher will have observed the kill, set the exit code, and fired the exit
218        // token. Join it (and the log pumps) so we don't leak tasks.
219        if let Some(handle) = self.exit_task.take() {
220            let _ = handle.await;
221        }
222        for handle in self.log_tasks.drain(..) {
223            let _ = handle.await;
224        }
225    }
226}
227
228impl Drop for UnixProcess {
229    fn drop(&mut self) {
230        if self.exit_task.is_some() {
231            warn!(
232                name = %self.name,
233                "UnixProcess dropped without explicit cleanup; child may have been killed via kill_on_drop."
234            );
235        }
236    }
237}
238
239fn spawn_log_pump<R>(reader: R, sink: Arc<Mutex<dyn LogSink>>, is_stderr: bool) -> JoinHandle<()>
240where
241    R: AsyncRead + Unpin + Send + 'static,
242{
243    let mut lines = BufReader::new(reader).lines();
244    tokio::spawn(async move {
245        loop {
246            match lines.next_line().await {
247                Ok(Some(line)) => {
248                    let mut sink = sink.lock().await;
249                    sink.push_line(line, is_stderr);
250                }
251                Ok(None) => break,
252                Err(e) => {
253                    debug!(error = %e, "Log pump read error; stopping.");
254                    break;
255                }
256            }
257        }
258    })
259}