airlock/
driver.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    path::{Path, PathBuf},
5    time::{Duration, Instant},
6};
7
8use bollard::{
9    container::{Config, CreateContainerOptions, ListContainersOptions, LogOutput, LogsOptions},
10    errors::Error,
11    image::CreateImageOptions,
12    models::{HealthConfig, HealthStatusEnum, HostConfig, Ipam},
13    network::CreateNetworkOptions,
14    secret::ContainerStateStatusEnum,
15    volume::CreateVolumeOptions,
16    Docker,
17};
18use futures::StreamExt as _;
19use saluki_error::{generic_error, ErrorContext as _, GenericError};
20use tokio::{
21    io::{AsyncWriteExt as _, BufWriter},
22    time::sleep,
23};
24use tracing::{debug, error, trace};
25
26use crate::config::{DatadogIntakeConfig, MillstoneConfig, TargetConfig};
27
28const MILLSTONE_CONFIG_PATH_INTERNAL: &str = "/etc/millstone/config.toml";
29const DATADOG_INTAKE_CONFIG_PATH_INTERNAL: &str = "/etc/datadog-intake/config.toml";
30
31pub enum ExitStatus {
32    Success,
33    Failed { code: i64, error: String },
34}
35
36impl fmt::Display for ExitStatus {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        match self {
39            ExitStatus::Success => write!(f, "success (0)"),
40            ExitStatus::Failed { code, error } => write!(f, "failed (exit code: {}, error: {})", code, error),
41        }
42    }
43}
44
45/// Driver configuration.
46///
47/// This is the basic set of configuration options needed to spawn the container for a given driver.
48#[derive(Clone)]
49pub struct DriverConfig {
50    driver_id: &'static str,
51    image: String,
52    entrypoint: Option<Vec<String>>,
53    command: Option<Vec<String>>,
54    env: Vec<String>,
55    binds: Vec<String>,
56    healthcheck: Option<HealthConfig>,
57    exposed_ports: Vec<(&'static str, u16)>,
58}
59
60impl DriverConfig {
61    pub async fn millstone(config: MillstoneConfig) -> Result<Self, GenericError> {
62        // Ensure the given configuration file path actually exists.
63        match tokio::fs::metadata(&config.config_path).await {
64            Ok(metadata) if metadata.is_file() => {}
65            Ok(_) => {
66                return Err(generic_error!(
67                    "Specified millstone configuration path ({}) does not point to a file.",
68                    config.config_path.display()
69                ))
70            }
71            Err(e) => {
72                return Err(generic_error!(
73                    "Failed to ensure specified millstone configuration ({}) exists locally: {}",
74                    config.config_path.display(),
75                    e
76                ))
77            }
78        }
79
80        let millstone_binary_path = config
81            .binary_path
82            .unwrap_or_else(|| "/usr/local/bin/millstone".to_string());
83        let entrypoint = vec![millstone_binary_path, MILLSTONE_CONFIG_PATH_INTERNAL.to_string()];
84
85        let driver_config = Self::from_image("millstone", config.image)
86            .with_entrypoint(entrypoint)
87            .with_bind_mount(config.config_path, MILLSTONE_CONFIG_PATH_INTERNAL);
88
89        Ok(driver_config)
90    }
91
92    pub async fn datadog_intake(config: DatadogIntakeConfig) -> Result<Self, GenericError> {
93        // Ensure the given configuration file path actually exists.
94        match tokio::fs::metadata(&config.config_path).await {
95            Ok(metadata) if metadata.is_file() => {}
96            Ok(_) => {
97                return Err(generic_error!(
98                    "Specified datadog-intake configuration ({}) does not point to a file.",
99                    config.config_path.display()
100                ))
101            }
102            Err(e) => {
103                return Err(generic_error!(
104                    "Failed to ensure specified datadog-intake configuration ({}) exists locally: {}",
105                    config.config_path.display(),
106                    e
107                ))
108            }
109        }
110
111        let datadog_intake_binary_path = config
112            .binary_path
113            .unwrap_or_else(|| "/usr/local/bin/datadog-intake".to_string());
114        let entrypoint = vec![
115            datadog_intake_binary_path,
116            DATADOG_INTAKE_CONFIG_PATH_INTERNAL.to_string(),
117        ];
118
119        let driver_config = DriverConfig::from_image("datadog-intake", config.image)
120            .with_entrypoint(entrypoint)
121            .with_bind_mount(config.config_path, DATADOG_INTAKE_CONFIG_PATH_INTERNAL)
122            // Map our intake port to an ephemeral port on the host side, which we'll query once the container has been
123            // started so that we can connect to it.
124            .with_exposed_port("tcp", 2049);
125
126        Ok(driver_config)
127    }
128
129    pub async fn target(target_id: &'static str, config: TargetConfig) -> Result<Self, GenericError> {
130        let driver_config = DriverConfig::from_image(target_id, config.image)
131            .with_entrypoint(config.entrypoint)
132            .with_command(config.command)
133            .with_env_vars(config.additional_env_vars);
134
135        Ok(driver_config)
136    }
137
138    /// Creates a new `DriverConfig` from the given driver identifier and container image reference.
139    fn from_image(driver_id: &'static str, image: String) -> Self {
140        Self {
141            driver_id,
142            image,
143            entrypoint: None,
144            command: None,
145            env: vec![],
146            binds: vec![],
147            healthcheck: None,
148            exposed_ports: vec![],
149        }
150    }
151
152    /// Sets the entrypoint for the container.
153    ///
154    /// If `entrypoint` is empty, the default entrypoint will be used.
155    pub fn with_entrypoint(mut self, entrypoint: Vec<String>) -> Self {
156        if !entrypoint.is_empty() {
157            self.entrypoint = Some(entrypoint);
158        }
159        self
160    }
161
162    /// Sets the command for the container.
163    ///
164    /// If `command` is empty, the default command will be used.
165    pub fn with_command(mut self, command: Vec<String>) -> Self {
166        if !command.is_empty() {
167            self.command = Some(command);
168        }
169        self
170    }
171
172    /// Adds an environment variable to the container.
173    pub fn with_env_var<K, V>(mut self, key: K, value: V) -> Self
174    where
175        K: AsRef<str>,
176        V: AsRef<str>,
177    {
178        self.env.push(format!("{}={}", key.as_ref(), value.as_ref()));
179        self
180    }
181
182    /// Adds environment variables to the container.
183    pub fn with_env_vars(mut self, env: Vec<String>) -> Self {
184        self.env.extend(env);
185        self
186    }
187
188    /// Adds a bind mount to the container.
189    ///
190    /// `host_path` represents the path on the host to mount, while `container_path` represents the path on the
191    /// container side to mount it to. Bind mounts can be either files or directories.
192    pub fn with_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
193    where
194        HP: AsRef<Path>,
195        CP: AsRef<Path>,
196    {
197        let bind_mount = format!("{}:{}", host_path.as_ref().display(), container_path.as_ref().display());
198        self.binds.push(bind_mount);
199        self
200    }
201
202    /// Sets the healthcheck for the container.
203    pub fn with_healthcheck(
204        mut self, mut test_command: Vec<String>, interval: Duration, timeout: Duration, retries: i64,
205        start_period: Duration, start_interval: Duration,
206    ) -> Self {
207        // We manually insert "CMD" as the first value in the command array, so that it doesn't have to be done by the
208        // caller, since it's some goofy ass syntax to have to know about.
209        test_command.insert(0, "CMD".to_string());
210
211        self.healthcheck = Some(HealthConfig {
212            test: Some(test_command),
213            interval: Some(interval.as_nanos() as i64),
214            timeout: Some(timeout.as_nanos() as i64),
215            retries: Some(retries),
216            start_period: Some(start_period.as_nanos() as i64),
217            start_interval: Some(start_interval.as_nanos() as i64),
218        });
219        self
220    }
221
222    /// Adds an exposed port to the container.
223    ///
224    /// Exposed ports are ports mapped from inside the container to an ephemeral port on the host. The `protocol` should
225    /// be either `tcp` or `udp`. A port on the host side is picked from the "local" port range. For example, on Linux
226    /// the range is defined by `/proc/sys/net/ipv4/ip_local_port_range`.
227    ///
228    /// When starting the driver via [`Driver::start`][crate::driver::Driver::start], the ephemeral port mappings will
229    /// be returned in [`DriverDetails`].
230    pub fn with_exposed_port(mut self, protocol: &'static str, internal_port: u16) -> Self {
231        self.exposed_ports.push((protocol, internal_port));
232        self
233    }
234}
235
236/// Detailed information about the spawned container.
237#[derive(Debug, Default)]
238pub struct DriverDetails {
239    container_name: String,
240    port_mappings: Option<HashMap<String, u16>>,
241}
242
243impl DriverDetails {
244    /// Returns the name of the container.
245    pub fn container_name(&self) -> &str {
246        &self.container_name
247    }
248
249    /// Attempts to look up a mapped ephemeral port for the given exposed port.
250    ///
251    /// The same `protocol` and internal port values used to expose the port must be used here. If the given
252    /// protocol/port combination was not exposed, `None` is returned. Otherwise, the mapped ephemeral port is returned.
253    /// This port is exposed on `0.0.0.0` on the host side.
254    pub fn try_get_exposed_port(&self, protocol: &str, internal_port: u16) -> Option<u16> {
255        self.port_mappings
256            .as_ref()
257            .and_then(|port_mappings| port_mappings.get(&format!("{}/{}", internal_port, protocol)).copied())
258    }
259}
260
261/// Container driver.
262pub struct Driver {
263    isolation_group_id: String,
264    isolation_group_name: String,
265    container_name: String,
266    config: DriverConfig,
267    docker: Docker,
268    log_dir: Option<PathBuf>,
269}
270
271impl Driver {
272    /// Creates a new `Driver` from the given isolation group ID and configuration.
273    ///
274    /// # Isolation group
275    ///
276    /// The isolation group ID serves as a unique identifier to be used for both the name of the container as well as
277    /// the shared resources that are created and attached to the container. If two drivers share the same isolation
278    /// group ID, the containers they spawn will be located in the same network namespace, have access to the same
279    /// shared Airlock volume, etc.
280    ///
281    /// # Shared volume
282    ///
283    /// The container will have a volume bind-mounted at `/airlock` that is shared between all containers in the same
284    /// isolation group. This volume is mounted as world writeable (777) so all containers can freely read and write to
285    /// it. This makes it easier for containers to share data between one another, but also means that care should be
286    /// taken to avoid conflicts between trying to write to the same file, etc.
287    ///
288    /// # Errors
289    ///
290    /// If the Docker client cannot be created/configured, an error will be returned.
291    pub fn from_config(isolation_group_id: String, config: DriverConfig) -> Result<Self, GenericError> {
292        let docker = Docker::connect_with_defaults()?;
293
294        Ok(Self {
295            isolation_group_name: format!("airlock-{}", isolation_group_id),
296            container_name: format!("airlock-{}-{}", isolation_group_id, config.driver_id),
297            isolation_group_id,
298            config,
299            docker,
300            log_dir: None,
301        })
302    }
303
304    /// Configures the driver to capture container logs.
305    ///
306    /// The logs will be stored in the given directory, under a subdirectory named after the isolation group ID. Each
307    /// container will get a log for standard output and standard error, following the pattern of `<container
308    /// name>.[stdout|stderr].log`.
309    pub fn with_logging(mut self, log_dir: PathBuf) -> Self {
310        self.log_dir = Some(log_dir);
311        self
312    }
313
314    /// Returns the string identifier of the driver.
315    ///
316    /// This is generally a shorthand of the application/service, such as `dogstatsd` or `millstone`.
317    pub fn driver_id(&self) -> &'static str {
318        self.config.driver_id
319    }
320
321    /// Clean up any containers, networks, and volumes related to the given isolation group ID.
322    ///
323    /// This is a free function to facilitate cleaning up resources after a number of drivers are run.
324    ///
325    /// # Errors
326    ///
327    /// If the Docker client cannot be created/configured, or there is an error when finding or removing any of the
328    /// related resources, an error will be returned.
329    pub async fn clean_related_resources(isolation_group_id: String) -> Result<(), GenericError> {
330        let docker = Docker::connect_with_defaults()?;
331
332        let isolation_group_name = format!("airlock-{}", isolation_group_id);
333        let isolation_group_label = format!("airlock-isolation-group={}", isolation_group_id);
334
335        // Remove any containers related to the isolation group. We do so forcefully.
336        let list_options = Some(ListContainersOptions {
337            all: true,
338            filters: vec![("label", vec!["created_by=airlock", isolation_group_label.as_str()])]
339                .into_iter()
340                .collect(),
341            ..Default::default()
342        });
343        let containers = docker.list_containers(list_options).await.with_error_context(|| {
344            format!(
345                "Failed to list containers attached to isolation group '{}'.",
346                isolation_group_id
347            )
348        })?;
349
350        for container in containers {
351            let container_name = match container.id {
352                Some(id) => id,
353                None => {
354                    debug!("Listed container had no ID. Skipping removal.");
355                    continue;
356                }
357            };
358
359            if let Err(e) = docker.stop_container(container_name.as_str(), None).await {
360                error!(error = %e, "Failed to stop container '{}'.", container_name);
361                continue;
362            } else {
363                debug!("Stopped container '{}'.", container_name);
364            }
365
366            if let Err(e) = docker.remove_container(container_name.as_str(), None).await {
367                error!(error = %e, "Failed to remove container '{}'.", container_name);
368                continue;
369            } else {
370                debug!("Removed container '{}'.", container_name);
371            }
372        }
373
374        // Remove the shared volume.
375        if let Err(e) = docker.remove_volume(isolation_group_name.as_str(), None).await {
376            error!(error = %e, "Failed to remove shared volume '{}'.", isolation_group_name);
377        } else {
378            debug!("Removed shared volume '{}'.", isolation_group_name);
379        }
380
381        // Remove the network.
382        if let Err(e) = docker.remove_network(isolation_group_name.as_str()).await {
383            error!(error = %e, "Failed to remove shared network '{}'.", isolation_group_name);
384        } else {
385            debug!("Removed shared network '{}'.", isolation_group_name);
386        }
387
388        Ok(())
389    }
390
391    async fn create_network_if_missing(&self) -> Result<(), GenericError> {
392        // See if the network already exists or not.
393        let networks = self.docker.list_networks::<String>(None).await?;
394        if networks
395            .iter()
396            .any(|network| network.name.as_deref() == Some(self.isolation_group_name.as_str()))
397        {
398            debug!("Network '{}' already exists.", self.isolation_group_name);
399            return Ok(());
400        }
401
402        debug!(
403            driver_id = self.config.driver_id,
404            isolation_group = self.isolation_group_id,
405            "Network '{}' does not exist. Creating...",
406            self.isolation_group_name
407        );
408
409        // Create the network since it doesn't yet exist.
410        let network_options = CreateNetworkOptions {
411            name: self.isolation_group_name.clone(),
412            check_duplicate: true,
413            driver: "bridge".to_string(),
414            ipam: Ipam::default(),
415            enable_ipv6: false,
416            labels: get_default_airlock_labels(self.isolation_group_id.as_str()),
417            ..Default::default()
418        };
419        let response = self.docker.create_network(network_options).await?;
420        debug!(
421            driver_id = self.config.driver_id,
422            isolation_group = self.isolation_group_id,
423            "Created network '{}' (ID: {:?}).",
424            self.isolation_group_name,
425            response.id
426        );
427
428        Ok(())
429    }
430
431    async fn create_image_if_missing_inner(&self, image: &str) -> Result<(), GenericError> {
432        let image_options = CreateImageOptions {
433            from_image: image,
434            ..Default::default()
435        };
436
437        let mut create_stream = self.docker.create_image(Some(image_options), None, None);
438        while let Some(info) = create_stream.next().await {
439            trace!(
440                driver_id = self.config.driver_id,
441                isolation_group = self.isolation_group_id,
442                image,
443                "Received image pull update: {:?}",
444                info
445            );
446        }
447
448        Ok(())
449    }
450
451    async fn create_image_if_missing(&self) -> Result<(), GenericError> {
452        debug!(
453            driver_id = self.config.driver_id,
454            isolation_group = self.isolation_group_id,
455            "Pulling image '{}'...",
456            self.config.image
457        );
458
459        self.create_image_if_missing_inner(self.config.image.as_str()).await?;
460
461        debug!(
462            driver_id = self.config.driver_id,
463            isolation_group = self.isolation_group_id,
464            "Pulled image '{}'.",
465            self.config.image
466        );
467
468        Ok(())
469    }
470
471    async fn create_volume_if_missing(&self) -> Result<(), GenericError> {
472        // Check to see if the shared volume already exists.
473        let volumes = self.docker.list_volumes::<String>(None).await?;
474        if volumes
475            .volumes
476            .iter()
477            .flatten()
478            .any(|volume| volume.name == self.isolation_group_name.as_str())
479        {
480            debug!("Shared volume '{}' already exists.", self.isolation_group_name);
481            return Ok(());
482        }
483
484        debug!(
485            driver_id = self.config.driver_id,
486            isolation_group = self.isolation_group_id,
487            "Shared volume '{}' does not exist. Creating...",
488            self.isolation_group_name
489        );
490
491        let volume_options = CreateVolumeOptions {
492            name: self.isolation_group_name.clone(),
493            driver: "local".to_string(),
494            labels: get_default_airlock_labels(self.isolation_group_id.as_str()),
495            ..Default::default()
496        };
497        self.docker.create_volume(volume_options).await?;
498
499        debug!(
500            driver_id = self.config.driver_id,
501            isolation_group = self.isolation_group_id,
502            "Created shared volume '{}'.",
503            self.isolation_group_name
504        );
505
506        Ok(())
507    }
508
509    async fn adjust_shared_volume_permissions(&self) -> Result<(), GenericError> {
510        debug!(
511            driver_id = self.config.driver_id,
512            isolation_group = self.isolation_group_id,
513            "Adjusting permissions on shared volume '{}'...",
514            self.container_name
515        );
516
517        // We spin up a minimal Alpine container, chmod the directory bind-mounted to the shared volume, and that's it.
518        let image = get_alpine_container_image();
519        self.create_image_if_missing_inner(&image).await?;
520
521        let container_name = format!("airlock-{}-volume-fix-up", self.isolation_group_id);
522        let entrypoint = vec![
523            "chmod".to_string(),
524            "-R".to_string(),
525            "777".to_string(),
526            "/airlock".to_string(),
527        ];
528        let _ = self
529            .create_container_inner(container_name.clone(), image, Some(entrypoint), None, vec![], None)
530            .await?;
531
532        self.start_container_inner(&container_name).await?;
533        self.wait_for_container_exit_inner(&container_name).await?;
534        self.cleanup_inner(&container_name).await?;
535
536        Ok(())
537    }
538
539    async fn create_container_inner(
540        &self, container_name: String, image: String, entrypoint: Option<Vec<String>>, cmd: Option<Vec<String>>,
541        mut binds: Vec<String>, env: Option<Vec<String>>,
542    ) -> Result<String, GenericError> {
543        // Take the configured binds for the container and add in our shared volume that all containers in this
544        // isolation group will use.
545        binds.push(format!("{}:/airlock:z", self.isolation_group_name));
546
547        // Map specific host-level paths into the containers so they can access host-level resources needed for origin
548        // detection.
549        binds.push("/proc:/host/proc:ro".to_string());
550        binds.push("/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string());
551        binds.push("/var/run/docker.sock:/var/run/docker.sock:ro".to_string());
552
553        let (publish_all_ports, exposed_ports) = if self.config.exposed_ports.is_empty() {
554            (None, None)
555        } else {
556            let mut exposed_ports = HashMap::new();
557            for (protocol, internal_port) in &self.config.exposed_ports {
558                exposed_ports.insert(format!("{}/{}", internal_port, protocol), HashMap::new());
559            }
560
561            (Some(true), Some(exposed_ports))
562        };
563
564        let container_config = Config {
565            hostname: Some(self.config.driver_id.to_string()),
566            env,
567            image: Some(image),
568            entrypoint,
569            cmd,
570            host_config: Some(HostConfig {
571                binds: Some(binds),
572                network_mode: Some(self.isolation_group_name.clone()),
573                publish_all_ports,
574                pid_mode: Some("host".to_string()),
575                ..Default::default()
576            }),
577            healthcheck: self.config.healthcheck.clone(),
578            exposed_ports,
579            labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
580            ..Default::default()
581        };
582
583        let create_options = CreateContainerOptions {
584            name: container_name,
585            ..Default::default()
586        };
587
588        let response = self
589            .docker
590            .create_container(Some(create_options), container_config)
591            .await?;
592
593        Ok(response.id)
594    }
595
596    async fn create_container(&self) -> Result<(), GenericError> {
597        debug!(
598            driver_id = self.config.driver_id,
599            isolation_group = self.isolation_group_id,
600            "Creating container '{}'...",
601            self.container_name
602        );
603
604        let container_id = self
605            .create_container_inner(
606                self.container_name.clone(),
607                self.config.image.clone(),
608                self.config.entrypoint.clone(),
609                self.config.command.clone(),
610                self.config.binds.clone(),
611                Some(self.config.env.clone()),
612            )
613            .await?;
614
615        debug!(
616            driver_id = self.config.driver_id,
617            isolation_group = self.isolation_group_id,
618            "Created container '{}' (ID: {}).",
619            self.container_name,
620            container_id
621        );
622
623        Ok(())
624    }
625
626    async fn start_container_inner(&self, container_name: &str) -> Result<DriverDetails, GenericError> {
627        self.docker.start_container::<String>(container_name, None).await?;
628
629        let mut details = DriverDetails {
630            container_name: container_name.to_string(),
631            ..Default::default()
632        };
633
634        let response = self.docker.inspect_container(container_name, None).await?;
635        if let Some(network_settings) = response.network_settings {
636            if let Some(ports) = network_settings.ports {
637                let port_mappings = details.port_mappings.get_or_insert_with(HashMap::new);
638                for (internal_port, bindings) in ports {
639                    if let Some(bindings) = bindings {
640                        for binding in bindings {
641                            if let Some(host_ip) = binding.host_ip.as_deref() {
642                                if host_ip == "0.0.0.0" {
643                                    let maybe_host_port =
644                                        binding.host_port.as_ref().and_then(|value| value.parse::<u16>().ok());
645
646                                    if let Some(host_port) = maybe_host_port {
647                                        port_mappings.insert(internal_port, host_port);
648                                        break;
649                                    }
650                                }
651                            }
652                        }
653                    }
654                }
655            }
656        }
657
658        Ok(details)
659    }
660
661    async fn start_container(&self) -> Result<DriverDetails, GenericError> {
662        debug!(
663            driver_id = self.config.driver_id,
664            isolation_group = self.isolation_group_id,
665            "Starting container '{}'...",
666            self.container_name
667        );
668
669        let details = self.start_container_inner(&self.container_name).await?;
670
671        if let Some(log_dir) = self.log_dir.clone() {
672            debug!(
673                "Capturing logs for container '{}' to {}...",
674                self.container_name,
675                log_dir.display()
676            );
677
678            self.capture_container_logs(log_dir, self.config.driver_id, &self.container_name)
679                .await?;
680        }
681
682        debug!(
683            driver_id = self.config.driver_id,
684            isolation_group = self.isolation_group_id,
685            "Started container '{}'.",
686            self.container_name
687        );
688
689        Ok(details)
690    }
691
692    /// Starts the container, creating any necessary resources.
693    ///
694    /// # Errors
695    ///
696    /// If there is an error while creating the network or shared volume, while pulling the container image, or while
697    /// creating or starting the container, it will be returned.
698    pub async fn start(&mut self) -> Result<DriverDetails, GenericError> {
699        self.create_network_if_missing().await?;
700        self.create_image_if_missing().await?;
701        self.create_volume_if_missing().await?;
702        self.adjust_shared_volume_permissions().await?;
703
704        self.create_container().await?;
705        self.start_container().await
706    }
707
708    /// Waits until the container is marked as healthy.
709    ///
710    /// If the container has no health checks defined, this returns early and does no waiting.
711    ///
712    /// # Errors
713    ///
714    /// If there is an error while inspecting the container, it will be returned.
715    pub async fn wait_for_container_healthy(&mut self) -> Result<(), GenericError> {
716        loop {
717            // Inspect the container, and see if it even has any health checks defined. If not, then we can return early.
718            let response = self.docker.inspect_container(&self.container_name, None).await?;
719            let state = response
720                .state
721                .ok_or_else(|| generic_error!("Container state should be present."))?;
722
723            // Make sure the container is actually running.
724            let status = state
725                .status
726                .ok_or_else(|| generic_error!("Container status should be present."))?;
727            if status != ContainerStateStatusEnum::RUNNING {
728                return Err(generic_error!("Container exited unexpectedly."));
729            }
730
731            if let Some(health_status) = state.health.and_then(|h| h.status) {
732                match health_status {
733                    // No healthcheck defined, or healthy, so we're good to go.
734                    HealthStatusEnum::EMPTY | HealthStatusEnum::NONE | HealthStatusEnum::HEALTHY => {
735                        debug!(
736                            driver_id = self.config.driver_id,
737                            "Container '{}' healthy or no healthcheck defined. Proceeding.", &self.container_name
738                        );
739                        return Ok(());
740                    }
741
742                    // Not healthy yet, so we'll keep waiting.
743                    HealthStatusEnum::STARTING | HealthStatusEnum::UNHEALTHY => {
744                        debug!(
745                            driver_id = self.config.driver_id,
746                            "Container '{}' not yet healthy. Waiting...", &self.container_name
747                        );
748                    }
749                }
750            } else {
751                debug!(
752                    driver_id = self.config.driver_id,
753                    "Container '{}' has no healthcheck defined. Proceeding.", &self.container_name
754                );
755                return Ok(());
756            }
757
758            // Wait for a second and then check again.
759            sleep(Duration::from_secs(1)).await;
760        }
761    }
762
763    async fn wait_for_container_exit_inner(&self, container_name: &str) -> Result<ExitStatus, GenericError> {
764        let mut wait_stream = self.docker.wait_container::<String>(container_name, None);
765        match wait_stream.next().await {
766            Some(result) => match result {
767                Ok(response) => {
768                    // When the exit code is non-zero, `bollard` transforms the normal `ContainerWaitResponse` into
769                    // `Error::DockerContainerWaitError`, which is why we have these asserts here to catch any scenario
770                    // where there's _somehow_ an error condition being indicated without it having been transformed into
771                    // `Error::DockerContainerWaitError`.
772                    //
773                    // Essentially, getting to this point should imply successfully exiting, but the API isn't very
774                    // ergonomic in that regard, so we're just making sure.
775                    assert_eq!(response.error, None);
776                    assert_eq!(response.status_code, 0);
777
778                    Ok(ExitStatus::Success)
779                }
780
781                Err(Error::DockerContainerWaitError { error, code }) => {
782                    let error = if error.is_empty() {
783                        String::from("<no error message provided>")
784                    } else {
785                        error
786                    };
787                    Ok(ExitStatus::Failed { code, error })
788                }
789
790                Err(e) => Err(generic_error!("Failed to wait for container to finish: {:?}", e)),
791            },
792            None => unreachable!("Docker wait stream ended unexpectedly."),
793        }
794    }
795
796    /// Waits for the container to finish successfully.
797    ///
798    /// The container's exit status is returned, indicating success (exit code 0) or failure (exit code != 0), including
799    /// any error message related to the failure.
800    ///
801    /// # Errors
802    ///
803    /// If an error is encountered while waiting for the container to exit, it will be returned.
804    pub async fn wait_for_container_exit(&self) -> Result<ExitStatus, GenericError> {
805        debug!(
806            driver_id = self.config.driver_id,
807            isolation_group = self.isolation_group_id,
808            "Waiting for container '{}' to finish...",
809            &self.container_name
810        );
811
812        let exit_status = self.wait_for_container_exit_inner(&self.container_name).await?;
813
814        debug!(
815            driver_id = self.config.driver_id,
816            isolation_group = self.isolation_group_id,
817            "Container '{}' finished successfully.",
818            &self.container_name
819        );
820
821        Ok(exit_status)
822    }
823
824    async fn cleanup_inner(&self, container_name: &str) -> Result<(), GenericError> {
825        self.docker.stop_container(container_name, None).await?;
826        self.docker.remove_container(container_name, None).await?;
827
828        Ok(())
829    }
830
831    /// Cleans up the container, stopping and removing it from the system.
832    ///
833    /// # Errors
834    ///
835    /// If there is an error while stopping or removing the container, it will be returned.
836    pub async fn cleanup(self) -> Result<(), GenericError> {
837        debug!(
838            driver_id = self.config.driver_id,
839            isolation_group = self.isolation_group_id,
840            "Cleaning up container '{}'...",
841            self.container_name
842        );
843
844        let start = Instant::now();
845
846        self.cleanup_inner(&self.container_name).await?;
847
848        debug!(
849            driver_id = self.config.driver_id,
850            isolation_group = self.isolation_group_id,
851            "Container '{}' removed after {:?}.",
852            self.container_name,
853            start.elapsed()
854        );
855
856        Ok(())
857    }
858
859    async fn capture_container_logs(
860        &self, container_log_dir: PathBuf, log_name: &str, container_name: &str,
861    ) -> Result<(), GenericError> {
862        // Make sure the directories exist first and prepare the files, just to get any permissions issues out of the
863        // way up front before we spawn our background task.
864        tokio::fs::create_dir_all(&container_log_dir)
865            .await
866            .error_context("Failed to create logs directory. Possible permissions issue.")?;
867
868        let stdout_log_path = container_log_dir.join(format!("{}.stdout.log", log_name));
869        let stderr_log_path = container_log_dir.join(format!("{}.stderr.log", log_name));
870
871        let mut stdout_file = tokio::fs::File::create(&stdout_log_path)
872            .await
873            .map(BufWriter::new)
874            .error_context("Failed to create standard output log file. Possible permissions issue.")?;
875        let mut stderr_file = tokio::fs::File::create(&stderr_log_path)
876            .await
877            .map(BufWriter::new)
878            .error_context("Failed to create standard error log file. Possible permissions issue.")?;
879
880        // Spawn a background task to capture the logs.
881        let logs_config = LogsOptions {
882            follow: true,
883            stdout: true,
884            stderr: true,
885            ..Default::default()
886        };
887        let mut log_stream = self.docker.logs::<String>(container_name, Some(logs_config));
888
889        tokio::spawn(async move {
890            while let Some(log_result) = log_stream.next().await {
891                match log_result {
892                    Ok(log) => match log {
893                        LogOutput::StdErr { message } => {
894                            if let Err(e) = stderr_file.write_all(&message[..]).await {
895                                error!(error = %e, "Failed to write log line to standard error log file.");
896                                break;
897                            }
898                            if let Err(e) = stderr_file.flush().await {
899                                error!(error = %e, "Failed to flush standard error log file.");
900                                break;
901                            }
902                        }
903                        LogOutput::StdOut { message } => {
904                            if let Err(e) = stdout_file.write_all(&message[..]).await {
905                                error!(error = %e, "Failed to write log line to standard output log file.");
906                                break;
907                            }
908                            if let Err(e) = stdout_file.flush().await {
909                                error!(error = %e, "Failed to flush standard output log file.");
910                                break;
911                            }
912                        }
913                        LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
914                    },
915                    Err(e) => {
916                        error!(error = %e, "Failed to read log line from container.");
917                        break;
918                    }
919                }
920            }
921
922            // One final fsync to ensure the logs are fully written to disk.
923            if let Err(e) = stdout_file.get_mut().sync_all().await {
924                error!(error = %e, "Failed to fsync standard output log file.");
925            }
926
927            if let Err(e) = stderr_file.get_mut().sync_all().await {
928                error!(error = %e, "Failed to fsync standard error log file.");
929            }
930        });
931
932        Ok(())
933    }
934}
935
936fn get_alpine_container_image() -> String {
937    // Normally, we would just use `alpine:latest` and let Docker figure out the registry to pull it from (i.e., Docker
938    // Hub) but in CI, we don't have Docker Hub available to us, so we need to use an internal registry.
939    //
940    // Rather than threading through this information from the top level, we simply look for an override environment
941    // variable here.. which lets us specify the right image reference to use in CI, while allowing normal users to just
942    // grab it from Docker Hub when running locally.
943    std::env::var("GROUND_TRUTH_ALPINE_IMAGE").unwrap_or_else(|_| "alpine:latest".to_string())
944}
945
946fn get_default_airlock_labels(isolation_group_id: &str) -> HashMap<String, String> {
947    let mut labels = HashMap::new();
948    labels.insert("created_by".to_string(), "airlock".to_string());
949    labels.insert("airlock-isolation-group".to_string(), isolation_group_id.to_string());
950    labels
951}