Skip to main content

airlock/
driver.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    path::{Path, PathBuf},
5    time::{Duration, Instant},
6};
7
8use bollard::{
9    container::LogOutput,
10    errors::Error,
11    exec::{CreateExecOptions, StartExecResults},
12    models::{
13        ContainerCreateBody, ContainerStateStatusEnum, HealthConfig, HealthStatusEnum, HostConfig, Ipam,
14        NetworkCreateRequest, VolumeCreateRequest,
15    },
16    query_parameters::{CreateContainerOptionsBuilder, CreateImageOptions, ListContainersOptionsBuilder, LogsOptions},
17    Docker,
18};
19use futures::{StreamExt as _, TryStreamExt as _};
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use tokio::{
22    io::{AsyncWriteExt as _, BufWriter},
23    time::sleep,
24};
25use tracing::{debug, error, trace};
26
27use crate::config::{DatadogIntakeConfig, MillstoneConfig, TargetConfig};
28
29const MILLSTONE_CONFIG_PATH_INTERNAL: &str = "/etc/millstone/config.toml";
30const DATADOG_INTAKE_CONFIG_PATH_INTERNAL: &str = "/etc/datadog-intake/config.toml";
31
32pub enum ExitStatus {
33    Success,
34    Failed { code: i64, error: String },
35}
36
37impl fmt::Display for ExitStatus {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            ExitStatus::Success => write!(f, "success (0)"),
41            ExitStatus::Failed { code, error } => write!(f, "failed (exit code: {}, error: {})", code, error),
42        }
43    }
44}
45
46/// Driver configuration.
47///
48/// This is the basic set of configuration options needed to spawn the container for a given driver.
49#[derive(Clone)]
50pub struct DriverConfig {
51    driver_id: &'static str,
52    image: String,
53    entrypoint: Option<Vec<String>>,
54    command: Option<Vec<String>>,
55    env: Vec<String>,
56    binds: Vec<String>,
57    healthcheck: Option<HealthConfig>,
58    exposed_ports: Vec<(&'static str, u16)>,
59}
60
61impl DriverConfig {
62    pub async fn millstone(config: MillstoneConfig) -> Result<Self, GenericError> {
63        // Ensure the given configuration file path actually exists.
64        match tokio::fs::metadata(&config.config_path).await {
65            Ok(metadata) if metadata.is_file() => {}
66            Ok(_) => {
67                return Err(generic_error!(
68                    "Specified millstone configuration path ({}) does not point to a file.",
69                    config.config_path.display()
70                ))
71            }
72            Err(e) => {
73                return Err(generic_error!(
74                    "Failed to ensure specified millstone configuration ({}) exists locally: {}",
75                    config.config_path.display(),
76                    e
77                ))
78            }
79        }
80
81        let millstone_binary_path = config
82            .binary_path
83            .unwrap_or_else(|| "/usr/local/bin/millstone".to_string());
84        let entrypoint = vec![millstone_binary_path, MILLSTONE_CONFIG_PATH_INTERNAL.to_string()];
85
86        let driver_config = Self::from_image("millstone", config.image)
87            .with_entrypoint(entrypoint)
88            .with_bind_mount(config.config_path, MILLSTONE_CONFIG_PATH_INTERNAL);
89
90        Ok(driver_config)
91    }
92
93    pub async fn datadog_intake(config: DatadogIntakeConfig) -> Result<Self, GenericError> {
94        // Ensure the given configuration file path actually exists.
95        match tokio::fs::metadata(&config.config_path).await {
96            Ok(metadata) if metadata.is_file() => {}
97            Ok(_) => {
98                return Err(generic_error!(
99                    "Specified datadog-intake configuration ({}) does not point to a file.",
100                    config.config_path.display()
101                ))
102            }
103            Err(e) => {
104                return Err(generic_error!(
105                    "Failed to ensure specified datadog-intake configuration ({}) exists locally: {}",
106                    config.config_path.display(),
107                    e
108                ))
109            }
110        }
111
112        let datadog_intake_binary_path = config
113            .binary_path
114            .unwrap_or_else(|| "/usr/local/bin/datadog-intake".to_string());
115        let entrypoint = vec![
116            datadog_intake_binary_path,
117            DATADOG_INTAKE_CONFIG_PATH_INTERNAL.to_string(),
118        ];
119
120        let driver_config = DriverConfig::from_image("datadog-intake", config.image)
121            .with_entrypoint(entrypoint)
122            .with_bind_mount(config.config_path, DATADOG_INTAKE_CONFIG_PATH_INTERNAL)
123            // Map our intake port to an ephemeral port on the host side, which we'll query once the container has been
124            // started so that we can connect to it.
125            .with_exposed_port("tcp", 2049);
126
127        Ok(driver_config)
128    }
129
130    pub async fn target(target_id: &'static str, config: TargetConfig) -> Result<Self, GenericError> {
131        let driver_config = DriverConfig::from_image(target_id, config.image)
132            .with_entrypoint(config.entrypoint)
133            .with_command(config.command)
134            .with_env_vars(config.additional_env_vars);
135
136        Ok(driver_config)
137    }
138
139    /// Creates a new `DriverConfig` from the given driver identifier and container image reference.
140    fn from_image(driver_id: &'static str, image: String) -> Self {
141        Self {
142            driver_id,
143            image,
144            entrypoint: None,
145            command: None,
146            env: vec![],
147            binds: vec![],
148            healthcheck: None,
149            exposed_ports: vec![],
150        }
151    }
152
153    /// Sets the entrypoint for the container.
154    ///
155    /// If `entrypoint` is empty, the default entrypoint will be used.
156    pub fn with_entrypoint(mut self, entrypoint: Vec<String>) -> Self {
157        if !entrypoint.is_empty() {
158            self.entrypoint = Some(entrypoint);
159        }
160        self
161    }
162
163    /// Sets the command for the container.
164    ///
165    /// If `command` is empty, the default command will be used.
166    pub fn with_command(mut self, command: Vec<String>) -> Self {
167        if !command.is_empty() {
168            self.command = Some(command);
169        }
170        self
171    }
172
173    /// Adds an environment variable to the container.
174    pub fn with_env_var<K, V>(mut self, key: K, value: V) -> Self
175    where
176        K: AsRef<str>,
177        V: AsRef<str>,
178    {
179        self.env.push(format!("{}={}", key.as_ref(), value.as_ref()));
180        self
181    }
182
183    /// Adds environment variables to the container.
184    pub fn with_env_vars(mut self, env: Vec<String>) -> Self {
185        self.env.extend(env);
186        self
187    }
188
189    /// Adds a bind mount to the container.
190    ///
191    /// `host_path` represents the path on the host to mount, while `container_path` represents the path on the
192    /// container side to mount it to. Bind mounts can be either files or directories.
193    pub fn with_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
194    where
195        HP: AsRef<Path>,
196        CP: AsRef<Path>,
197    {
198        let bind_mount = format!("{}:{}", host_path.as_ref().display(), container_path.as_ref().display());
199        self.binds.push(bind_mount);
200        self
201    }
202
203    /// Adds a read-only bind mount to the container.
204    ///
205    /// Same as [`with_bind_mount`][Self::with_bind_mount] but the container cannot modify the mounted path.
206    pub fn with_readonly_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
207    where
208        HP: AsRef<Path>,
209        CP: AsRef<Path>,
210    {
211        let bind_mount = format!(
212            "{}:{}:ro",
213            host_path.as_ref().display(),
214            container_path.as_ref().display()
215        );
216        self.binds.push(bind_mount);
217        self
218    }
219
220    /// Sets the healthcheck for the container.
221    pub fn with_healthcheck(
222        mut self, mut test_command: Vec<String>, interval: Duration, timeout: Duration, retries: i64,
223        start_period: Duration, start_interval: Duration,
224    ) -> Self {
225        // We manually insert "CMD" as the first value in the command array, so that it doesn't have to be done by the
226        // caller, since it's some goofy ass syntax to have to know about.
227        test_command.insert(0, "CMD".to_string());
228
229        self.healthcheck = Some(HealthConfig {
230            test: Some(test_command),
231            interval: Some(interval.as_nanos() as i64),
232            timeout: Some(timeout.as_nanos() as i64),
233            retries: Some(retries),
234            start_period: Some(start_period.as_nanos() as i64),
235            start_interval: Some(start_interval.as_nanos() as i64),
236        });
237        self
238    }
239
240    /// Adds an exposed port to the container.
241    ///
242    /// Exposed ports are ports mapped from inside the container to an ephemeral port on the host. The `protocol` should
243    /// be either `tcp` or `udp`. A port on the host side is picked from the "local" port range. For example, on Linux
244    /// the range is defined by `/proc/sys/net/ipv4/ip_local_port_range`.
245    ///
246    /// When starting the driver via [`Driver::start`][crate::driver::Driver::start], the ephemeral port mappings will
247    /// be returned in [`DriverDetails`].
248    pub fn with_exposed_port(mut self, protocol: &'static str, internal_port: u16) -> Self {
249        self.exposed_ports.push((protocol, internal_port));
250        self
251    }
252}
253
254/// Detailed information about the spawned container.
255#[derive(Debug, Default)]
256pub struct DriverDetails {
257    container_name: String,
258    port_mappings: Option<HashMap<String, u16>>,
259}
260
261impl DriverDetails {
262    /// Returns the name of the container.
263    pub fn container_name(&self) -> &str {
264        &self.container_name
265    }
266
267    /// Attempts to look up a mapped ephemeral port for the given exposed port.
268    ///
269    /// The same `protocol` and internal port values used to expose the port must be used here. If the given
270    /// protocol/port combination was not exposed, `None` is returned. Otherwise, the mapped ephemeral port is returned.
271    /// This port is exposed on `0.0.0.0` on the host side.
272    pub fn try_get_exposed_port(&self, protocol: &str, internal_port: u16) -> Option<u16> {
273        self.port_mappings
274            .as_ref()
275            .and_then(|port_mappings| port_mappings.get(&format!("{}/{}", internal_port, protocol)).copied())
276    }
277}
278
279/// Container driver.
280pub struct Driver {
281    isolation_group_id: String,
282    isolation_group_name: String,
283    container_name: String,
284    config: DriverConfig,
285    docker: Docker,
286    log_dir: Option<PathBuf>,
287}
288
289impl Driver {
290    /// Creates a new `Driver` from the given isolation group ID and configuration.
291    ///
292    /// # Isolation group
293    ///
294    /// The isolation group ID serves as a unique identifier to be used for both the name of the container as well as
295    /// the shared resources that are created and attached to the container. If two drivers share the same isolation
296    /// group ID, the containers they spawn will be located in the same network namespace, have access to the same
297    /// shared Airlock volume, etc.
298    ///
299    /// # Shared volume
300    ///
301    /// The container will have a volume bind-mounted at `/airlock` that is shared between all containers in the same
302    /// isolation group. This volume is mounted as world writeable (777) so all containers can freely read and write to
303    /// it. This makes it easier for containers to share data between one another, but also means that care should be
304    /// taken to avoid conflicts between trying to write to the same file, etc.
305    ///
306    /// # Errors
307    ///
308    /// If the Docker client cannot be created/configured, an error will be returned.
309    pub fn from_config(isolation_group_id: String, config: DriverConfig) -> Result<Self, GenericError> {
310        let docker = crate::docker::connect()?;
311
312        Ok(Self {
313            isolation_group_name: format!("airlock-{}", isolation_group_id),
314            container_name: format!("airlock-{}-{}", isolation_group_id, config.driver_id),
315            isolation_group_id,
316            config,
317            docker,
318            log_dir: None,
319        })
320    }
321
322    /// Configures the driver to capture container logs.
323    ///
324    /// The logs will be stored in the given directory, under a subdirectory named after the isolation group ID. Each
325    /// container will get a log for standard output and standard error, following the pattern of `<container
326    /// name>.[stdout|stderr].log`.
327    pub fn with_logging(mut self, log_dir: PathBuf) -> Self {
328        self.log_dir = Some(log_dir);
329        self
330    }
331
332    /// Returns the string identifier of the driver.
333    ///
334    /// This is generally a shorthand of the application/service, such as `dogstatsd` or `millstone`.
335    pub fn driver_id(&self) -> &'static str {
336        self.config.driver_id
337    }
338
339    /// Clean up any containers, networks, and volumes related to the given isolation group ID.
340    ///
341    /// This is a free function to facilitate cleaning up resources after a number of drivers are run.
342    ///
343    /// # Errors
344    ///
345    /// If the Docker client cannot be created/configured, or there is an error when finding or removing any of the
346    /// related resources, an error will be returned.
347    pub async fn clean_related_resources(isolation_group_id: String) -> Result<(), GenericError> {
348        let docker = crate::docker::connect()?;
349
350        let isolation_group_name = format!("airlock-{}", isolation_group_id);
351        let isolation_group_label = format!("airlock-isolation-group={}", isolation_group_id);
352
353        // Remove any containers related to the isolation group. We do so forcefully.
354        let list_filters: HashMap<&str, Vec<&str>> =
355            [("label", vec!["created_by=airlock", isolation_group_label.as_str()])]
356                .into_iter()
357                .collect();
358        let list_options = Some(
359            ListContainersOptionsBuilder::default()
360                .all(true)
361                .filters(&list_filters)
362                .build(),
363        );
364        let containers = docker.list_containers(list_options).await.with_error_context(|| {
365            format!(
366                "Failed to list containers attached to isolation group '{}'.",
367                isolation_group_id
368            )
369        })?;
370
371        for container in containers {
372            let container_name = match container.id {
373                Some(id) => id,
374                None => {
375                    debug!("Listed container had no ID. Skipping removal.");
376                    continue;
377                }
378            };
379
380            if let Err(e) = docker.stop_container(container_name.as_str(), None).await {
381                error!(error = %e, "Failed to stop container '{}'.", container_name);
382                continue;
383            } else {
384                debug!("Stopped container '{}'.", container_name);
385            }
386
387            if let Err(e) = docker.remove_container(container_name.as_str(), None).await {
388                error!(error = %e, "Failed to remove container '{}'.", container_name);
389                continue;
390            } else {
391                debug!("Removed container '{}'.", container_name);
392            }
393        }
394
395        // Remove the shared volume.
396        if let Err(e) = docker
397            .remove_volume(
398                isolation_group_name.as_str(),
399                None::<bollard::query_parameters::RemoveVolumeOptions>,
400            )
401            .await
402        {
403            error!(error = %e, "Failed to remove shared volume '{}'.", isolation_group_name);
404        } else {
405            debug!("Removed shared volume '{}'.", isolation_group_name);
406        }
407
408        // Remove the network.
409        if let Err(e) = docker.remove_network(isolation_group_name.as_str()).await {
410            error!(error = %e, "Failed to remove shared network '{}'.", isolation_group_name);
411        } else {
412            debug!("Removed shared network '{}'.", isolation_group_name);
413        }
414
415        Ok(())
416    }
417
418    async fn create_network_if_missing(&self) -> Result<(), GenericError> {
419        // See if the network already exists or not.
420        let networks = self.docker.list_networks(None).await?;
421        if networks
422            .iter()
423            .any(|network| network.name.as_deref() == Some(self.isolation_group_name.as_str()))
424        {
425            debug!("Network '{}' already exists.", self.isolation_group_name);
426            return Ok(());
427        }
428
429        debug!(
430            driver_id = self.config.driver_id,
431            isolation_group = self.isolation_group_id,
432            "Network '{}' does not exist. Creating...",
433            self.isolation_group_name
434        );
435
436        // Create the network since it doesn't yet exist.
437        let network_options = NetworkCreateRequest {
438            name: self.isolation_group_name.clone(),
439            driver: Some("bridge".to_string()),
440            ipam: Some(Ipam::default()),
441            enable_ipv6: Some(false),
442            labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
443            ..Default::default()
444        };
445        let response = self.docker.create_network(network_options).await?;
446        debug!(
447            driver_id = self.config.driver_id,
448            isolation_group = self.isolation_group_id,
449            "Created network '{}' (ID: {:?}).",
450            self.isolation_group_name,
451            response.id
452        );
453
454        Ok(())
455    }
456
457    async fn create_image_if_missing_inner(&self, image: &str) -> Result<(), GenericError> {
458        let image_options = CreateImageOptions {
459            from_image: Some(image.to_string()),
460            ..Default::default()
461        };
462
463        let mut create_stream = self.docker.create_image(Some(image_options), None, None);
464        while let Some(info) = create_stream.next().await {
465            trace!(
466                driver_id = self.config.driver_id,
467                isolation_group = self.isolation_group_id,
468                image,
469                "Received image pull update: {:?}",
470                info
471            );
472        }
473
474        Ok(())
475    }
476
477    async fn create_image_if_missing(&self) -> Result<(), GenericError> {
478        debug!(
479            driver_id = self.config.driver_id,
480            isolation_group = self.isolation_group_id,
481            "Pulling image '{}'...",
482            self.config.image
483        );
484
485        self.create_image_if_missing_inner(self.config.image.as_str()).await?;
486
487        debug!(
488            driver_id = self.config.driver_id,
489            isolation_group = self.isolation_group_id,
490            "Pulled image '{}'.",
491            self.config.image
492        );
493
494        Ok(())
495    }
496
497    async fn create_volume_if_missing(&self) -> Result<(), GenericError> {
498        // Check to see if the shared volume already exists.
499        let volumes = self
500            .docker
501            .list_volumes(None::<bollard::query_parameters::ListVolumesOptions>)
502            .await?;
503        if volumes
504            .volumes
505            .iter()
506            .flatten()
507            .any(|volume| volume.name == self.isolation_group_name.as_str())
508        {
509            debug!("Shared volume '{}' already exists.", self.isolation_group_name);
510            return Ok(());
511        }
512
513        debug!(
514            driver_id = self.config.driver_id,
515            isolation_group = self.isolation_group_id,
516            "Shared volume '{}' does not exist. Creating...",
517            self.isolation_group_name
518        );
519
520        let volume_options = VolumeCreateRequest {
521            name: Some(self.isolation_group_name.clone()),
522            driver: Some("local".to_string()),
523            labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
524            ..Default::default()
525        };
526        self.docker.create_volume(volume_options).await?;
527
528        debug!(
529            driver_id = self.config.driver_id,
530            isolation_group = self.isolation_group_id,
531            "Created shared volume '{}'.",
532            self.isolation_group_name
533        );
534
535        Ok(())
536    }
537
538    async fn adjust_shared_volume_permissions(&self) -> Result<(), GenericError> {
539        debug!(
540            driver_id = self.config.driver_id,
541            isolation_group = self.isolation_group_id,
542            "Adjusting permissions on shared volume '{}'...",
543            self.container_name
544        );
545
546        // We spin up a minimal Alpine container, chmod the directory bind-mounted to the shared volume, and that's it.
547        let image = get_alpine_container_image();
548        self.create_image_if_missing_inner(&image).await?;
549
550        let container_name = format!("airlock-{}-volume-fix-up", self.isolation_group_id);
551        let entrypoint = vec![
552            "chmod".to_string(),
553            "-R".to_string(),
554            "777".to_string(),
555            "/airlock".to_string(),
556        ];
557        let _ = self
558            .create_container_inner(container_name.clone(), image, Some(entrypoint), None, vec![], None)
559            .await?;
560
561        self.start_container_inner(&container_name).await?;
562        self.wait_for_container_exit_inner(&container_name).await?;
563        self.cleanup_inner(&container_name).await?;
564
565        Ok(())
566    }
567
568    async fn create_container_inner(
569        &self, container_name: String, image: String, entrypoint: Option<Vec<String>>, cmd: Option<Vec<String>>,
570        mut binds: Vec<String>, env: Option<Vec<String>>,
571    ) -> Result<String, GenericError> {
572        // Take the configured binds for the container and add in our shared volume that all containers in this
573        // isolation group will use.
574        binds.push(format!("{}:/airlock:z", self.isolation_group_name));
575
576        // Map specific host-level paths into the containers so they can access host-level resources needed for origin
577        // detection.
578        binds.push("/proc:/host/proc:ro".to_string());
579        binds.push("/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string());
580        binds.push("/var/run/docker.sock:/var/run/docker.sock:ro".to_string());
581
582        let (publish_all_ports, exposed_ports) = if self.config.exposed_ports.is_empty() {
583            (None, None)
584        } else {
585            let exposed_ports: Vec<String> = self
586                .config
587                .exposed_ports
588                .iter()
589                .map(|(protocol, internal_port)| format!("{}/{}", internal_port, protocol))
590                .collect();
591            (Some(true), Some(exposed_ports))
592        };
593
594        let container_config = ContainerCreateBody {
595            hostname: Some(self.config.driver_id.to_string()),
596            env,
597            image: Some(image),
598            entrypoint,
599            cmd,
600            host_config: Some(HostConfig {
601                binds: Some(binds),
602                network_mode: Some(self.isolation_group_name.clone()),
603                publish_all_ports,
604                pid_mode: Some("host".to_string()),
605                ..Default::default()
606            }),
607            healthcheck: self.config.healthcheck.clone(),
608            exposed_ports,
609            labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
610            ..Default::default()
611        };
612
613        let create_options = CreateContainerOptionsBuilder::default().name(&container_name).build();
614
615        let response = self
616            .docker
617            .create_container(Some(create_options), container_config)
618            .await?;
619
620        Ok(response.id)
621    }
622
623    async fn create_container(&self) -> Result<(), GenericError> {
624        debug!(
625            driver_id = self.config.driver_id,
626            isolation_group = self.isolation_group_id,
627            "Creating container '{}'...",
628            self.container_name
629        );
630
631        let container_id = self
632            .create_container_inner(
633                self.container_name.clone(),
634                self.config.image.clone(),
635                self.config.entrypoint.clone(),
636                self.config.command.clone(),
637                self.config.binds.clone(),
638                Some(self.config.env.clone()),
639            )
640            .await?;
641
642        debug!(
643            driver_id = self.config.driver_id,
644            isolation_group = self.isolation_group_id,
645            "Created container '{}' (ID: {}).",
646            self.container_name,
647            container_id
648        );
649
650        Ok(())
651    }
652
653    async fn start_container_inner(&self, container_name: &str) -> Result<DriverDetails, GenericError> {
654        self.docker.start_container(container_name, None).await?;
655
656        let mut details = DriverDetails {
657            container_name: container_name.to_string(),
658            ..Default::default()
659        };
660
661        let response = self.docker.inspect_container(container_name, None).await?;
662        if let Some(network_settings) = response.network_settings {
663            if let Some(ports) = network_settings.ports {
664                let port_mappings = details.port_mappings.get_or_insert_with(HashMap::new);
665                for (internal_port, bindings) in ports {
666                    if let Some(bindings) = bindings {
667                        for binding in bindings {
668                            if let Some(host_ip) = binding.host_ip.as_deref() {
669                                if host_ip == "0.0.0.0" {
670                                    let maybe_host_port =
671                                        binding.host_port.as_ref().and_then(|value| value.parse::<u16>().ok());
672
673                                    if let Some(host_port) = maybe_host_port {
674                                        port_mappings.insert(internal_port, host_port);
675                                        break;
676                                    }
677                                }
678                            }
679                        }
680                    }
681                }
682            }
683        }
684
685        Ok(details)
686    }
687
688    async fn start_container(&self) -> Result<DriverDetails, GenericError> {
689        debug!(
690            driver_id = self.config.driver_id,
691            isolation_group = self.isolation_group_id,
692            "Starting container '{}'...",
693            self.container_name
694        );
695
696        let details = self.start_container_inner(&self.container_name).await?;
697
698        if let Some(log_dir) = self.log_dir.clone() {
699            debug!(
700                "Capturing logs for container '{}' to {}...",
701                self.container_name,
702                log_dir.display()
703            );
704
705            self.capture_container_logs(log_dir, self.config.driver_id, &self.container_name)
706                .await?;
707        }
708
709        debug!(
710            driver_id = self.config.driver_id,
711            isolation_group = self.isolation_group_id,
712            "Started container '{}'.",
713            self.container_name
714        );
715
716        Ok(details)
717    }
718
719    /// Starts the container, creating any necessary resources.
720    ///
721    /// # Errors
722    ///
723    /// If there is an error while creating the network or shared volume, while pulling the container image, or while
724    /// creating or starting the container, it will be returned.
725    pub async fn start(&mut self) -> Result<DriverDetails, GenericError> {
726        self.create_network_if_missing().await?;
727        self.create_image_if_missing().await?;
728        self.create_volume_if_missing().await?;
729        self.adjust_shared_volume_permissions().await?;
730
731        self.create_container().await?;
732        self.start_container().await
733    }
734
735    /// Waits until the container is marked as healthy.
736    ///
737    /// If the container has no health checks defined, this returns early and does no waiting.
738    ///
739    /// # Errors
740    ///
741    /// If there is an error while inspecting the container, it will be returned.
742    pub async fn wait_for_container_healthy(&mut self) -> Result<(), GenericError> {
743        loop {
744            // Inspect the container, and see if it even has any health checks defined. If not, then we can return early.
745            let response = self.docker.inspect_container(&self.container_name, None).await?;
746            let state = response
747                .state
748                .ok_or_else(|| generic_error!("Container state should be present."))?;
749
750            // Make sure the container is actually running.
751            let status = state
752                .status
753                .ok_or_else(|| generic_error!("Container status should be present."))?;
754            if status != ContainerStateStatusEnum::RUNNING {
755                return Err(generic_error!(
756                    "Container exited unexpectedly (driver_id: {}, container: {}). Check logs in the test run directory.",
757                    self.config.driver_id,
758                    self.container_name
759                ));
760            }
761
762            if let Some(health_status) = state.health.and_then(|h| h.status) {
763                match health_status {
764                    // No healthcheck defined, or healthy, so we're good to go.
765                    HealthStatusEnum::EMPTY | HealthStatusEnum::NONE | HealthStatusEnum::HEALTHY => {
766                        debug!(
767                            driver_id = self.config.driver_id,
768                            "Container '{}' healthy or no healthcheck defined. Proceeding.", &self.container_name
769                        );
770                        return Ok(());
771                    }
772
773                    // Not healthy yet, so we'll keep waiting.
774                    HealthStatusEnum::STARTING | HealthStatusEnum::UNHEALTHY => {
775                        debug!(
776                            driver_id = self.config.driver_id,
777                            "Container '{}' not yet healthy. Waiting...", &self.container_name
778                        );
779                    }
780                }
781            } else {
782                debug!(
783                    driver_id = self.config.driver_id,
784                    "Container '{}' has no healthcheck defined. Proceeding.", &self.container_name
785                );
786                return Ok(());
787            }
788
789            // Wait for a second and then check again.
790            sleep(Duration::from_secs(1)).await;
791        }
792    }
793
794    async fn wait_for_container_exit_inner(&self, container_name: &str) -> Result<ExitStatus, GenericError> {
795        let mut wait_stream = self.docker.wait_container(container_name, None);
796        match wait_stream.next().await {
797            Some(result) => match result {
798                Ok(response) => {
799                    // When the exit code is non-zero, `bollard` transforms the normal `ContainerWaitResponse` into
800                    // `Error::DockerContainerWaitError`, which is why we have these asserts here to catch any scenario
801                    // where there's _somehow_ an error condition being indicated without it having been transformed into
802                    // `Error::DockerContainerWaitError`.
803                    //
804                    // Essentially, getting to this point should imply successfully exiting, but the API isn't very
805                    // ergonomic in that regard, so we're just making sure.
806                    assert_eq!(response.error, None);
807                    assert_eq!(response.status_code, 0);
808
809                    Ok(ExitStatus::Success)
810                }
811
812                Err(Error::DockerContainerWaitError { error, code }) => {
813                    let error = if error.is_empty() {
814                        String::from("<no error message provided>")
815                    } else {
816                        error
817                    };
818                    Ok(ExitStatus::Failed { code, error })
819                }
820
821                Err(e) => Err(generic_error!("Failed to wait for container to finish: {:?}", e)),
822            },
823            None => unreachable!("Docker wait stream ended unexpectedly."),
824        }
825    }
826
827    /// Waits for the container to finish successfully.
828    ///
829    /// The container's exit status is returned, indicating success (exit code 0) or failure (exit code != 0), including
830    /// any error message related to the failure.
831    ///
832    /// # Errors
833    ///
834    /// If an error is encountered while waiting for the container to exit, it will be returned.
835    pub async fn wait_for_container_exit(&self) -> Result<ExitStatus, GenericError> {
836        debug!(
837            driver_id = self.config.driver_id,
838            isolation_group = self.isolation_group_id,
839            "Waiting for container '{}' to finish...",
840            &self.container_name
841        );
842
843        let exit_status = self.wait_for_container_exit_inner(&self.container_name).await?;
844
845        debug!(
846            driver_id = self.config.driver_id,
847            isolation_group = self.isolation_group_id,
848            "Container '{}' finished successfully.",
849            &self.container_name
850        );
851
852        Ok(exit_status)
853    }
854
855    /// Executes a command inside the running container and returns its stdout.
856    ///
857    /// The command runs as root with no TTY. Stderr is discarded — only stdout is returned. If the command exits with a
858    /// nonzero status, an error is returned.
859    ///
860    /// # Errors
861    ///
862    /// If the exec creation, start, output collection, or command exit code indicates failure, an error is returned.
863    pub async fn exec_in_container(&self, cmd: Vec<String>) -> Result<String, GenericError> {
864        let exec_opts = CreateExecOptions {
865            attach_stdout: Some(true),
866            attach_stderr: Some(false),
867            cmd: Some(cmd.clone()),
868            ..Default::default()
869        };
870
871        let exec = self
872            .docker
873            .create_exec(&self.container_name, exec_opts)
874            .await
875            .with_error_context(|| format!("Failed to create exec instance for container {}.", self.container_name))?;
876
877        let exec_id = exec.id.clone();
878
879        let output = self
880            .docker
881            .start_exec(&exec.id, None)
882            .await
883            .with_error_context(|| format!("Failed to start exec for container {}.", self.container_name))?;
884
885        let mut stdout = String::new();
886        if let StartExecResults::Attached { mut output, .. } = output {
887            while let Some(chunk) = output.try_next().await? {
888                if let LogOutput::StdOut { message } = chunk {
889                    stdout.push_str(&String::from_utf8_lossy(&message));
890                }
891            }
892        }
893
894        // Check the command's exit code.
895        let inspect = self
896            .docker
897            .inspect_exec(&exec_id)
898            .await
899            .error_context("Failed to inspect exec result.")?;
900
901        if let Some(code) = inspect.exit_code {
902            if code != 0 {
903                return Err(generic_error!(
904                    "Command {:?} exited with code {} in container {}.",
905                    cmd,
906                    code,
907                    self.container_name
908                ));
909            }
910        }
911
912        Ok(stdout)
913    }
914
915    async fn cleanup_inner(&self, container_name: &str) -> Result<(), GenericError> {
916        self.docker.stop_container(container_name, None).await?;
917        self.docker.remove_container(container_name, None).await?;
918
919        Ok(())
920    }
921
922    /// Cleans up the container, stopping and removing it from the system.
923    ///
924    /// # Errors
925    ///
926    /// If there is an error while stopping or removing the container, it will be returned.
927    pub async fn cleanup(self) -> Result<(), GenericError> {
928        debug!(
929            driver_id = self.config.driver_id,
930            isolation_group = self.isolation_group_id,
931            "Cleaning up container '{}'...",
932            self.container_name
933        );
934
935        let start = Instant::now();
936
937        self.cleanup_inner(&self.container_name).await?;
938
939        debug!(
940            driver_id = self.config.driver_id,
941            isolation_group = self.isolation_group_id,
942            "Container '{}' removed after {:?}.",
943            self.container_name,
944            start.elapsed()
945        );
946
947        Ok(())
948    }
949
950    async fn capture_container_logs(
951        &self, container_log_dir: PathBuf, log_name: &str, container_name: &str,
952    ) -> Result<(), GenericError> {
953        // Make sure the directories exist first and prepare the files, just to get any permissions issues out of the
954        // way up front before we spawn our background task.
955        tokio::fs::create_dir_all(&container_log_dir)
956            .await
957            .error_context("Failed to create logs directory. Possible permissions issue.")?;
958
959        let stdout_log_path = container_log_dir.join(format!("{}.stdout.log", log_name));
960        let stderr_log_path = container_log_dir.join(format!("{}.stderr.log", log_name));
961
962        let mut stdout_file = tokio::fs::File::create(&stdout_log_path)
963            .await
964            .map(BufWriter::new)
965            .error_context("Failed to create standard output log file. Possible permissions issue.")?;
966        let mut stderr_file = tokio::fs::File::create(&stderr_log_path)
967            .await
968            .map(BufWriter::new)
969            .error_context("Failed to create standard error log file. Possible permissions issue.")?;
970
971        // Spawn a background task to capture the logs.
972        let logs_config = LogsOptions {
973            follow: true,
974            stdout: true,
975            stderr: true,
976            ..Default::default()
977        };
978        let mut log_stream = self.docker.logs(container_name, Some(logs_config));
979
980        tokio::spawn(async move {
981            while let Some(log_result) = log_stream.next().await {
982                match log_result {
983                    Ok(log) => match log {
984                        LogOutput::StdErr { message } => {
985                            if let Err(e) = stderr_file.write_all(&message[..]).await {
986                                error!(error = %e, "Failed to write log line to standard error log file.");
987                                break;
988                            }
989                            if let Err(e) = stderr_file.flush().await {
990                                error!(error = %e, "Failed to flush standard error log file.");
991                                break;
992                            }
993                        }
994                        LogOutput::StdOut { message } => {
995                            if let Err(e) = stdout_file.write_all(&message[..]).await {
996                                error!(error = %e, "Failed to write log line to standard output log file.");
997                                break;
998                            }
999                            if let Err(e) = stdout_file.flush().await {
1000                                error!(error = %e, "Failed to flush standard output log file.");
1001                                break;
1002                            }
1003                        }
1004                        LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
1005                    },
1006                    Err(e) => {
1007                        error!(error = %e, "Failed to read log line from container.");
1008                        break;
1009                    }
1010                }
1011            }
1012
1013            // One final fsync to ensure the logs are fully written to disk.
1014            if let Err(e) = stdout_file.get_mut().sync_all().await {
1015                error!(error = %e, "Failed to fsync standard output log file.");
1016            }
1017
1018            if let Err(e) = stderr_file.get_mut().sync_all().await {
1019                error!(error = %e, "Failed to fsync standard error log file.");
1020            }
1021        });
1022
1023        Ok(())
1024    }
1025}
1026
1027fn get_alpine_container_image() -> String {
1028    // Normally, we would just use `alpine:latest` and let Docker figure out the registry to pull it from (i.e., Docker
1029    // Hub) but in CI, we don't have Docker Hub available to us, so we need to use an internal registry.
1030    //
1031    // Rather than threading through this information from the top level, we simply look for an override environment
1032    // variable here.. which lets us specify the right image reference to use in CI, while allowing normal users to just
1033    // grab it from Docker Hub when running locally.
1034    std::env::var("PANORAMIC_ALPINE_IMAGE").unwrap_or_else(|_| "alpine:latest".to_string())
1035}
1036
1037fn get_default_airlock_labels(isolation_group_id: &str) -> HashMap<String, String> {
1038    let mut labels = HashMap::new();
1039    labels.insert("created_by".to_string(), "airlock".to_string());
1040    labels.insert("airlock-isolation-group".to_string(), isolation_group_id.to_string());
1041    labels
1042}