Skip to main content

airlock/
driver.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    path::{Path, PathBuf},
5    time::{Duration, Instant},
6};
7
8use bollard::{
9    container::{LogOutput, NetworkingConfig as ContainerNetworkingConfig},
10    errors::Error,
11    exec::{CreateExecOptions, StartExecResults},
12    models::{
13        ContainerCreateBody, ContainerStateStatusEnum, EndpointSettings, HealthConfig, HealthStatusEnum, HostConfig,
14        Ipam, NetworkConnectRequest, NetworkCreateRequest, VolumeCreateRequest,
15    },
16    query_parameters::{CreateContainerOptionsBuilder, CreateImageOptions, ListContainersOptionsBuilder, LogsOptions},
17    Docker,
18};
19use futures::{StreamExt as _, TryStreamExt as _};
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use tokio::{
22    io::{AsyncWriteExt as _, BufWriter},
23    time::sleep,
24};
25use tracing::{debug, error, trace};
26
27use crate::config::{DatadogIntakeConfig, MillstoneConfig, TargetConfig};
28
29const MILLSTONE_CONFIG_PATH_INTERNAL: &str = "/etc/millstone/config.toml";
30const DATADOG_INTAKE_HEALTHCHECK_INTERVAL: Duration = Duration::from_secs(1);
31const DATADOG_INTAKE_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(1);
32const DATADOG_INTAKE_HEALTHCHECK_RETRIES: i64 = 30;
33const DATADOG_INTAKE_HEALTHCHECK_START_PERIOD: Duration = Duration::from_secs(1);
34const DATADOG_INTAKE_HEALTHCHECK_START_INTERVAL: Duration = Duration::from_secs(1);
35const DATADOG_INTAKE_HEALTHCHECK_COMMAND: &str = concat!(
36    "exec 3<>/dev/tcp/127.0.0.1/2049 && ",
37    "printf 'GET /ready HTTP/1.1\\r\\nHost: localhost\\r\\nConnection: close\\r\\n\\r\\n' >&3 && ",
38    "grep -q '200 OK' <&3"
39);
40
41pub enum ExitStatus {
42    Success,
43    Failed { code: i64, error: String },
44}
45
46impl fmt::Display for ExitStatus {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        match self {
49            ExitStatus::Success => write!(f, "success (0)"),
50            ExitStatus::Failed { code, error } => write!(f, "failed (exit code: {}, error: {})", code, error),
51        }
52    }
53}
54
55/// Driver configuration.
56///
57/// This is the basic set of configuration options needed to spawn the container for a given driver.
58#[derive(Clone)]
59pub struct DriverConfig {
60    driver_id: &'static str,
61    image: String,
62    entrypoint: Option<Vec<String>>,
63    command: Option<Vec<String>>,
64    env: Vec<String>,
65    binds: Vec<String>,
66    healthcheck: Option<HealthConfig>,
67    exposed_ports: Vec<(&'static str, u16)>,
68    /// Additional named Docker volume mounts, in `volume_name:/container/path` format.
69    ///
70    /// Unlike bind mounts specified via [`with_bind_mount`][Self::with_bind_mount], these reference
71    /// existing named Docker volumes rather than host filesystem paths. Used to mount volumes that
72    /// belong to other isolation groups (for example, a shared millstone mounting both the baseline
73    /// and comparison agent volumes).
74    additional_volume_mounts: Vec<String>,
75
76    /// DNS aliases for this container on its primary network.
77    ///
78    /// Set via `NetworkingConfig.EndpointsConfig` at container creation time. Other containers on
79    /// the same network can reach this container using any of these aliases in addition to its
80    /// hostname. Used to give agent containers unambiguous names (for example, `"baseline"`, `"comparison"`)
81    /// that the shared millstone can use to address each one independently.
82    network_aliases: Vec<String>,
83
84    /// Additional Docker networks to connect this container to after creation.
85    ///
86    /// The primary network is set via `HostConfig.NetworkMode`. Each network listed here is joined
87    /// via a separate `docker network connect` call after the container is created but before it's
88    /// started. Used to connect the shared millstone container to both agent networks so it can
89    /// reach `baseline` and `comparison` by hostname.
90    additional_networks: Vec<String>,
91}
92
93impl DriverConfig {
94    pub async fn millstone(config: MillstoneConfig) -> Result<Self, GenericError> {
95        // Ensure the given configuration file path actually exists.
96        match tokio::fs::metadata(&config.config_path).await {
97            Ok(metadata) if metadata.is_file() => {}
98            Ok(_) => {
99                return Err(generic_error!(
100                    "Specified millstone configuration path ({}) does not point to a file.",
101                    config.config_path.display()
102                ))
103            }
104            Err(e) => {
105                return Err(generic_error!(
106                    "Failed to ensure specified millstone configuration ({}) exists locally: {}",
107                    config.config_path.display(),
108                    e
109                ))
110            }
111        }
112
113        let millstone_binary_path = config
114            .binary_path
115            .unwrap_or_else(|| "/usr/local/bin/millstone".to_string());
116        let entrypoint = vec![millstone_binary_path, MILLSTONE_CONFIG_PATH_INTERNAL.to_string()];
117
118        let driver_config = Self::from_image("millstone", config.image)
119            .with_entrypoint(entrypoint)
120            .with_bind_mount(config.config_path, MILLSTONE_CONFIG_PATH_INTERNAL);
121
122        Ok(driver_config)
123    }
124
125    pub async fn datadog_intake(config: DatadogIntakeConfig) -> Result<Self, GenericError> {
126        let datadog_intake_binary_path = config
127            .binary_path
128            .unwrap_or_else(|| "/usr/local/bin/datadog-intake".to_string());
129        let entrypoint = vec![datadog_intake_binary_path];
130
131        let driver_config = DriverConfig::from_image("datadog-intake", config.image)
132            .with_entrypoint(entrypoint)
133            .with_healthcheck(
134                vec![
135                    "/bin/bash".to_string(),
136                    "-c".to_string(),
137                    DATADOG_INTAKE_HEALTHCHECK_COMMAND.to_string(),
138                ],
139                DATADOG_INTAKE_HEALTHCHECK_INTERVAL,
140                DATADOG_INTAKE_HEALTHCHECK_TIMEOUT,
141                DATADOG_INTAKE_HEALTHCHECK_RETRIES,
142                DATADOG_INTAKE_HEALTHCHECK_START_PERIOD,
143                DATADOG_INTAKE_HEALTHCHECK_START_INTERVAL,
144            )
145            // Map our intake port to an ephemeral port on the host side, which we'll query once the container has been
146            // started so that we can connect to it.
147            .with_exposed_port("tcp", 2049);
148
149        Ok(driver_config)
150    }
151
152    pub async fn target(target_id: &'static str, config: TargetConfig) -> Result<Self, GenericError> {
153        let driver_config = DriverConfig::from_image(target_id, config.image)
154            .with_entrypoint(config.entrypoint)
155            .with_command(config.command)
156            .with_env_vars(config.additional_env_vars);
157
158        Ok(driver_config)
159    }
160
161    /// Creates a new `DriverConfig` from the given driver identifier and container image reference.
162    pub fn from_image(driver_id: &'static str, image: String) -> Self {
163        Self {
164            driver_id,
165            image,
166            entrypoint: None,
167            command: None,
168            env: vec![],
169            binds: vec![],
170            healthcheck: None,
171            exposed_ports: vec![],
172            additional_volume_mounts: vec![],
173            network_aliases: vec![],
174            additional_networks: vec![],
175        }
176    }
177
178    /// Sets the entrypoint for the container.
179    ///
180    /// If `entrypoint` is empty, the default entrypoint will be used.
181    pub fn with_entrypoint(mut self, entrypoint: Vec<String>) -> Self {
182        if !entrypoint.is_empty() {
183            self.entrypoint = Some(entrypoint);
184        }
185        self
186    }
187
188    /// Sets the command for the container.
189    ///
190    /// If `command` is empty, the default command will be used.
191    pub fn with_command(mut self, command: Vec<String>) -> Self {
192        if !command.is_empty() {
193            self.command = Some(command);
194        }
195        self
196    }
197
198    /// Adds an environment variable to the container.
199    pub fn with_env_var<K, V>(mut self, key: K, value: V) -> Self
200    where
201        K: AsRef<str>,
202        V: AsRef<str>,
203    {
204        self.env.push(format!("{}={}", key.as_ref(), value.as_ref()));
205        self
206    }
207
208    /// Adds environment variables to the container.
209    pub fn with_env_vars(mut self, env: Vec<String>) -> Self {
210        self.env.extend(env);
211        self
212    }
213
214    /// Adds a bind mount to the container.
215    ///
216    /// `host_path` represents the path on the host to mount, while `container_path` represents the path on the
217    /// container side to mount it to. Bind mounts can be either files or directories.
218    pub fn with_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
219    where
220        HP: AsRef<Path>,
221        CP: AsRef<Path>,
222    {
223        let bind_mount = format!("{}:{}", host_path.as_ref().display(), container_path.as_ref().display());
224        self.binds.push(bind_mount);
225        self
226    }
227
228    /// Adds a read-only bind mount to the container.
229    ///
230    /// Same as [`with_bind_mount`][Self::with_bind_mount] but the container can't modify the mounted path.
231    pub fn with_readonly_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
232    where
233        HP: AsRef<Path>,
234        CP: AsRef<Path>,
235    {
236        let bind_mount = format!(
237            "{}:{}:ro",
238            host_path.as_ref().display(),
239            container_path.as_ref().display()
240        );
241        self.binds.push(bind_mount);
242        self
243    }
244
245    /// Sets the healthcheck for the container.
246    pub fn with_healthcheck(
247        mut self, mut test_command: Vec<String>, interval: Duration, timeout: Duration, retries: i64,
248        start_period: Duration, start_interval: Duration,
249    ) -> Self {
250        // We manually insert "CMD" as the first value in the command array, so that it doesn't have to be done by the
251        // caller, since it's some goofy ass syntax to have to know about.
252        test_command.insert(0, "CMD".to_string());
253
254        self.healthcheck = Some(HealthConfig {
255            test: Some(test_command),
256            interval: Some(interval.as_nanos() as i64),
257            timeout: Some(timeout.as_nanos() as i64),
258            retries: Some(retries),
259            start_period: Some(start_period.as_nanos() as i64),
260            start_interval: Some(start_interval.as_nanos() as i64),
261        });
262        self
263    }
264
265    /// Adds a DNS alias for this container on its primary network.
266    ///
267    /// Other containers on the same network can resolve this container by `alias` in addition to
268    /// its hostname. Call this before the container is started.
269    pub fn with_network_alias(mut self, alias: impl Into<String>) -> Self {
270        self.network_aliases.push(alias.into());
271        self
272    }
273
274    /// Connects this container to an additional Docker network after creation.
275    ///
276    /// The primary network is always the container's isolation group network. Each network added
277    /// here is joined via `docker network connect` after the container is created but before it
278    /// is started, so the container is reachable on all listed networks from the moment it runs.
279    pub fn with_network(mut self, network: impl Into<String>) -> Self {
280        self.additional_networks.push(network.into());
281        self
282    }
283
284    /// Mounts a named Docker volume into the container at the given path.
285    ///
286    /// Unlike [`with_bind_mount`][Self::with_bind_mount], this references a named Docker volume
287    /// rather than a host filesystem path. The volume must already exist when the container starts.
288    /// This is useful for mounting volumes that belong to other isolation groups: for example,
289    /// a shared millstone container that needs to reach the DogStatsD sockets of both the baseline
290    /// and comparison agent containers.
291    pub fn with_volume_mount(mut self, volume_name: impl Into<String>, container_path: impl AsRef<Path>) -> Self {
292        self.additional_volume_mounts
293            .push(format!("{}:{}", volume_name.into(), container_path.as_ref().display()));
294        self
295    }
296
297    /// Adds an exposed port to the container.
298    ///
299    /// Exposed ports are ports mapped from inside the container to an ephemeral port on the host. The `protocol` should
300    /// be either `tcp` or `udp`. A port on the host side is picked from the "local" port range. For example, on Linux
301    /// the range is defined by `/proc/sys/net/ipv4/ip_local_port_range`.
302    ///
303    /// When starting the driver via [`Driver::start`][crate::driver::Driver::start], the ephemeral port mappings will
304    /// be returned in [`DriverDetails`].
305    pub fn with_exposed_port(mut self, protocol: &'static str, internal_port: u16) -> Self {
306        self.exposed_ports.push((protocol, internal_port));
307        self
308    }
309}
310
311/// Detailed information about the spawned container.
312#[derive(Debug, Default)]
313pub struct DriverDetails {
314    container_name: String,
315    port_mappings: Option<HashMap<String, u16>>,
316}
317
318impl DriverDetails {
319    /// Returns the name of the container.
320    pub fn container_name(&self) -> &str {
321        &self.container_name
322    }
323
324    /// Attempts to look up a mapped ephemeral port for the given exposed port.
325    ///
326    /// The same `protocol` and internal port values used to expose the port must be used here. If the given
327    /// protocol/port combination wasn't exposed, `None` is returned. Otherwise, the mapped ephemeral port is returned.
328    /// This port is exposed on `0.0.0.0` on the host side.
329    pub fn try_get_exposed_port(&self, protocol: &str, internal_port: u16) -> Option<u16> {
330        self.port_mappings
331            .as_ref()
332            .and_then(|port_mappings| port_mappings.get(&format!("{}/{}", internal_port, protocol)).copied())
333    }
334}
335
336/// Container driver.
337pub struct Driver {
338    isolation_group_id: String,
339    isolation_group_name: String,
340    container_name: String,
341    config: DriverConfig,
342    docker: Docker,
343    log_dir: Option<PathBuf>,
344}
345
346impl Driver {
347    /// Creates a new `Driver` from the given isolation group ID and configuration.
348    ///
349    /// # Isolation group
350    ///
351    /// The isolation group ID serves as a unique identifier to be used for both the name of the container as well as
352    /// the shared resources that are created and attached to the container. If two drivers share the same isolation
353    /// group ID, the containers they spawn will be located in the same network namespace, have access to the same
354    /// shared Airlock volume, etc.
355    ///
356    /// # Shared volume
357    ///
358    /// The container will have a volume bind-mounted at `/airlock` that's shared between all containers in the same
359    /// isolation group. This volume is mounted as world writeable (777) so all containers can freely read and write to
360    /// it. This makes it easier for containers to share data between one another, but also means that care should be
361    /// taken to avoid conflicts between trying to write to the same file, etc.
362    ///
363    /// # Errors
364    ///
365    /// If the Docker client can't be created/configured, an error will be returned.
366    pub fn from_config(isolation_group_id: String, config: DriverConfig) -> Result<Self, GenericError> {
367        let docker = crate::docker::connect()?;
368
369        Ok(Self {
370            isolation_group_name: format!("airlock-{}", isolation_group_id),
371            container_name: format!("airlock-{}-{}", isolation_group_id, config.driver_id),
372            isolation_group_id,
373            config,
374            docker,
375            log_dir: None,
376        })
377    }
378
379    /// Configures the driver to capture container logs.
380    ///
381    /// The logs will be stored in the given directory, under a subdirectory named after the isolation group ID. Each
382    /// container will get a log for standard output and standard error, following the pattern of `<container
383    /// name>.[stdout|stderr].log`.
384    pub fn with_logging(mut self, log_dir: PathBuf) -> Self {
385        self.log_dir = Some(log_dir);
386        self
387    }
388
389    /// Returns the string identifier of the driver.
390    ///
391    /// This is generally a shorthand of the application/service, such as `dogstatsd` or `millstone`.
392    pub fn driver_id(&self) -> &'static str {
393        self.config.driver_id
394    }
395
396    /// Clean up any containers, networks, and volumes related to the given isolation group ID.
397    ///
398    /// This is a free function to facilitate cleaning up resources after a number of drivers are run.
399    ///
400    /// # Errors
401    ///
402    /// If the Docker client can't be created/configured, or there is an error when finding or removing any of the
403    /// related resources, an error will be returned.
404    pub async fn clean_related_resources(isolation_group_id: String) -> Result<(), GenericError> {
405        let docker = crate::docker::connect()?;
406
407        let isolation_group_name = format!("airlock-{}", isolation_group_id);
408        let isolation_group_label = format!("airlock-isolation-group={}", isolation_group_id);
409
410        // Remove any containers related to the isolation group. We do so forcefully.
411        let list_filters: HashMap<&str, Vec<&str>> =
412            [("label", vec!["created_by=airlock", isolation_group_label.as_str()])]
413                .into_iter()
414                .collect();
415        let list_options = Some(
416            ListContainersOptionsBuilder::default()
417                .all(true)
418                .filters(&list_filters)
419                .build(),
420        );
421        let containers = docker.list_containers(list_options).await.with_error_context(|| {
422            format!(
423                "Failed to list containers attached to isolation group '{}'.",
424                isolation_group_id
425            )
426        })?;
427
428        for container in containers {
429            let container_name = match container.id {
430                Some(id) => id,
431                None => {
432                    debug!("Listed container had no ID. Skipping removal.");
433                    continue;
434                }
435            };
436
437            if let Err(e) = docker.stop_container(container_name.as_str(), None).await {
438                error!(error = %e, "Failed to stop container '{}'.", container_name);
439                continue;
440            } else {
441                debug!("Stopped container '{}'.", container_name);
442            }
443
444            if let Err(e) = docker.remove_container(container_name.as_str(), None).await {
445                error!(error = %e, "Failed to remove container '{}'.", container_name);
446                continue;
447            } else {
448                debug!("Removed container '{}'.", container_name);
449            }
450        }
451
452        // Remove the shared volume.
453        if let Err(e) = docker
454            .remove_volume(
455                isolation_group_name.as_str(),
456                None::<bollard::query_parameters::RemoveVolumeOptions>,
457            )
458            .await
459        {
460            error!(error = %e, "Failed to remove shared volume '{}'.", isolation_group_name);
461        } else {
462            debug!("Removed shared volume '{}'.", isolation_group_name);
463        }
464
465        // Remove the network.
466        if let Err(e) = docker.remove_network(isolation_group_name.as_str()).await {
467            error!(error = %e, "Failed to remove shared network '{}'.", isolation_group_name);
468        } else {
469            debug!("Removed shared network '{}'.", isolation_group_name);
470        }
471
472        Ok(())
473    }
474
475    async fn create_network_if_missing(&self) -> Result<(), GenericError> {
476        // See if the network already exists or not.
477        let networks = self.docker.list_networks(None).await?;
478        if networks
479            .iter()
480            .any(|network| network.name.as_deref() == Some(self.isolation_group_name.as_str()))
481        {
482            debug!("Network '{}' already exists.", self.isolation_group_name);
483            return Ok(());
484        }
485
486        debug!(
487            driver_id = self.config.driver_id,
488            isolation_group = self.isolation_group_id,
489            "Network '{}' does not exist. Creating...",
490            self.isolation_group_name
491        );
492
493        // Create the network since it doesn't yet exist.
494        let network_options = NetworkCreateRequest {
495            name: self.isolation_group_name.clone(),
496            driver: Some("bridge".to_string()),
497            ipam: Some(Ipam::default()),
498            enable_ipv6: Some(false),
499            labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
500            ..Default::default()
501        };
502        let response = self.docker.create_network(network_options).await?;
503        debug!(
504            driver_id = self.config.driver_id,
505            isolation_group = self.isolation_group_id,
506            "Created network '{}' (ID: {:?}).",
507            self.isolation_group_name,
508            response.id
509        );
510
511        Ok(())
512    }
513
514    async fn create_image_if_missing_inner(&self, image: &str) -> Result<(), GenericError> {
515        let image_options = CreateImageOptions {
516            from_image: Some(image.to_string()),
517            ..Default::default()
518        };
519
520        let mut create_stream = self.docker.create_image(Some(image_options), None, None);
521        while let Some(info) = create_stream.next().await {
522            trace!(
523                driver_id = self.config.driver_id,
524                isolation_group = self.isolation_group_id,
525                image,
526                "Received image pull update: {:?}",
527                info
528            );
529        }
530
531        Ok(())
532    }
533
534    async fn create_image_if_missing(&self) -> Result<(), GenericError> {
535        debug!(
536            driver_id = self.config.driver_id,
537            isolation_group = self.isolation_group_id,
538            "Pulling image '{}'...",
539            self.config.image
540        );
541
542        self.create_image_if_missing_inner(self.config.image.as_str()).await?;
543
544        debug!(
545            driver_id = self.config.driver_id,
546            isolation_group = self.isolation_group_id,
547            "Pulled image '{}'.",
548            self.config.image
549        );
550
551        Ok(())
552    }
553
554    async fn create_volume_if_missing(&self) -> Result<(), GenericError> {
555        // Check to see if the shared volume already exists.
556        let volumes = self
557            .docker
558            .list_volumes(None::<bollard::query_parameters::ListVolumesOptions>)
559            .await?;
560        if volumes
561            .volumes
562            .iter()
563            .flatten()
564            .any(|volume| volume.name == self.isolation_group_name.as_str())
565        {
566            debug!("Shared volume '{}' already exists.", self.isolation_group_name);
567            return Ok(());
568        }
569
570        debug!(
571            driver_id = self.config.driver_id,
572            isolation_group = self.isolation_group_id,
573            "Shared volume '{}' does not exist. Creating...",
574            self.isolation_group_name
575        );
576
577        let volume_options = VolumeCreateRequest {
578            name: Some(self.isolation_group_name.clone()),
579            driver: Some("local".to_string()),
580            labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
581            ..Default::default()
582        };
583        self.docker.create_volume(volume_options).await?;
584
585        debug!(
586            driver_id = self.config.driver_id,
587            isolation_group = self.isolation_group_id,
588            "Created shared volume '{}'.",
589            self.isolation_group_name
590        );
591
592        Ok(())
593    }
594
595    async fn adjust_shared_volume_permissions(&self) -> Result<(), GenericError> {
596        debug!(
597            driver_id = self.config.driver_id,
598            isolation_group = self.isolation_group_id,
599            "Adjusting permissions on shared volume '{}'...",
600            self.container_name
601        );
602
603        // We spin up a minimal Alpine container, chmod the directory bind-mounted to the shared volume, and that's it.
604        let image = get_alpine_container_image();
605        self.create_image_if_missing_inner(&image).await?;
606
607        let container_name = format!("airlock-{}-volume-fix-up", self.isolation_group_id);
608        let entrypoint = vec![
609            "chmod".to_string(),
610            "-R".to_string(),
611            "777".to_string(),
612            "/airlock".to_string(),
613        ];
614        let _ = self
615            .create_container_inner(container_name.clone(), image, Some(entrypoint), None, vec![], None)
616            .await?;
617
618        self.start_container_inner(&container_name).await?;
619        self.wait_for_container_exit_inner(&container_name).await?;
620        self.cleanup_inner(&container_name).await?;
621
622        Ok(())
623    }
624
625    async fn create_container_inner(
626        &self, container_name: String, image: String, entrypoint: Option<Vec<String>>, cmd: Option<Vec<String>>,
627        mut binds: Vec<String>, env: Option<Vec<String>>,
628    ) -> Result<String, GenericError> {
629        // Take the configured binds for the container and add in our shared volume that all containers in this
630        // isolation group will use.
631        binds.push(format!("{}:/airlock:z", self.isolation_group_name));
632
633        // Map specific host-level paths into the containers so they can access host-level resources needed for origin
634        // detection.
635        binds.push("/proc:/host/proc:ro".to_string());
636        binds.push("/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string());
637        binds.push("/var/run/docker.sock:/var/run/docker.sock:ro".to_string());
638
639        // Append any additional named volume mounts (for example, cross-group volumes for shared millstone).
640        for mount in &self.config.additional_volume_mounts {
641            binds.push(mount.clone());
642        }
643
644        // Set up NetworkingConfig to apply aliases on the primary network, if any are configured.
645        let networking_config = if !self.config.network_aliases.is_empty() {
646            let mut endpoints = HashMap::new();
647            endpoints.insert(
648                self.isolation_group_name.clone(),
649                EndpointSettings {
650                    aliases: Some(self.config.network_aliases.clone()),
651                    ..Default::default()
652                },
653            );
654            Some(
655                ContainerNetworkingConfig {
656                    endpoints_config: endpoints,
657                }
658                .into(),
659            )
660        } else {
661            None
662        };
663
664        let (publish_all_ports, exposed_ports) = if self.config.exposed_ports.is_empty() {
665            (None, None)
666        } else {
667            let exposed_ports: Vec<String> = self
668                .config
669                .exposed_ports
670                .iter()
671                .map(|(protocol, internal_port)| format!("{}/{}", internal_port, protocol))
672                .collect();
673            (Some(true), Some(exposed_ports))
674        };
675
676        let container_config = ContainerCreateBody {
677            hostname: Some(self.config.driver_id.to_string()),
678            env,
679            image: Some(image),
680            entrypoint,
681            cmd,
682            host_config: Some(HostConfig {
683                binds: Some(binds),
684                network_mode: Some(self.isolation_group_name.clone()),
685                publish_all_ports,
686                pid_mode: Some("host".to_string()),
687                ..Default::default()
688            }),
689            healthcheck: self.config.healthcheck.clone(),
690            exposed_ports,
691            labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
692            networking_config,
693            ..Default::default()
694        };
695
696        let create_options = CreateContainerOptionsBuilder::default().name(&container_name).build();
697
698        let response = self
699            .docker
700            .create_container(Some(create_options), container_config)
701            .await?;
702
703        Ok(response.id)
704    }
705
706    async fn create_container(&self) -> Result<(), GenericError> {
707        debug!(
708            driver_id = self.config.driver_id,
709            isolation_group = self.isolation_group_id,
710            "Creating container '{}'...",
711            self.container_name
712        );
713
714        let container_id = self
715            .create_container_inner(
716                self.container_name.clone(),
717                self.config.image.clone(),
718                self.config.entrypoint.clone(),
719                self.config.command.clone(),
720                self.config.binds.clone(),
721                Some(self.config.env.clone()),
722            )
723            .await?;
724
725        debug!(
726            driver_id = self.config.driver_id,
727            isolation_group = self.isolation_group_id,
728            "Created container '{}' (ID: {}).",
729            self.container_name,
730            container_id
731        );
732
733        Ok(())
734    }
735
736    async fn start_container_inner(&self, container_name: &str) -> Result<DriverDetails, GenericError> {
737        self.docker.start_container(container_name, None).await?;
738
739        let mut details = DriverDetails {
740            container_name: container_name.to_string(),
741            ..Default::default()
742        };
743
744        let response = self.docker.inspect_container(container_name, None).await?;
745        if let Some(network_settings) = response.network_settings {
746            if let Some(ports) = network_settings.ports {
747                let port_mappings = details.port_mappings.get_or_insert_with(HashMap::new);
748                for (internal_port, bindings) in ports {
749                    if let Some(bindings) = bindings {
750                        for binding in bindings {
751                            if let Some(host_ip) = binding.host_ip.as_deref() {
752                                if host_ip == "0.0.0.0" {
753                                    let maybe_host_port =
754                                        binding.host_port.as_ref().and_then(|value| value.parse::<u16>().ok());
755
756                                    if let Some(host_port) = maybe_host_port {
757                                        port_mappings.insert(internal_port, host_port);
758                                        break;
759                                    }
760                                }
761                            }
762                        }
763                    }
764                }
765            }
766        }
767
768        Ok(details)
769    }
770
771    async fn start_container(&self) -> Result<DriverDetails, GenericError> {
772        debug!(
773            driver_id = self.config.driver_id,
774            isolation_group = self.isolation_group_id,
775            "Starting container '{}'...",
776            self.container_name
777        );
778
779        let details = self.start_container_inner(&self.container_name).await?;
780
781        if let Some(log_dir) = self.log_dir.clone() {
782            debug!(
783                "Capturing logs for container '{}' to {}...",
784                self.container_name,
785                log_dir.display()
786            );
787
788            self.capture_container_logs(log_dir, self.config.driver_id, &self.container_name)
789                .await?;
790        }
791
792        debug!(
793            driver_id = self.config.driver_id,
794            isolation_group = self.isolation_group_id,
795            "Started container '{}'.",
796            self.container_name
797        );
798
799        Ok(details)
800    }
801
802    /// Connects this container to each network listed in `additional_networks`.
803    ///
804    /// Called after container creation but before start, so the container is already reachable
805    /// on all configured networks from the moment it begins running.
806    async fn connect_to_additional_networks(&self) -> Result<(), GenericError> {
807        for network in &self.config.additional_networks {
808            self.docker
809                .connect_network(
810                    network,
811                    NetworkConnectRequest {
812                        container: self.container_name.clone(),
813                        endpoint_config: None,
814                    },
815                )
816                .await
817                .map_err(|e| {
818                    generic_error!(
819                        "Failed to connect container '{}' to network '{}': {}",
820                        self.container_name,
821                        network,
822                        e
823                    )
824                })?;
825        }
826        Ok(())
827    }
828
829    /// Starts the container, creating any necessary resources.
830    ///
831    /// # Errors
832    ///
833    /// If there is an error while creating the network or shared volume, while pulling the container image, or while
834    /// creating or starting the container, it will be returned.
835    pub async fn start(&mut self) -> Result<DriverDetails, GenericError> {
836        self.create_network_if_missing().await?;
837        self.create_image_if_missing().await?;
838        self.create_volume_if_missing().await?;
839        self.adjust_shared_volume_permissions().await?;
840
841        self.create_container().await?;
842        self.connect_to_additional_networks().await?;
843        self.start_container().await
844    }
845
846    /// Waits until the container is marked as healthy.
847    ///
848    /// If the container has no health checks defined, this returns early and does no waiting.
849    ///
850    /// # Errors
851    ///
852    /// If there is an error while inspecting the container, it will be returned.
853    pub async fn wait_for_container_healthy(&mut self) -> Result<(), GenericError> {
854        loop {
855            // Inspect the container, and see if it even has any health checks defined. If not, then we can return early.
856            let response = self.docker.inspect_container(&self.container_name, None).await?;
857            let state = response
858                .state
859                .ok_or_else(|| generic_error!("Container state should be present."))?;
860
861            // Make sure the container is actually running.
862            let status = state
863                .status
864                .ok_or_else(|| generic_error!("Container status should be present."))?;
865            if status != ContainerStateStatusEnum::RUNNING {
866                return Err(generic_error!(
867                    "Container exited unexpectedly (driver_id: {}, container: {}). Check logs in the test run directory.",
868                    self.config.driver_id,
869                    self.container_name
870                ));
871            }
872
873            if let Some(health_status) = state.health.and_then(|h| h.status) {
874                match health_status {
875                    // No healthcheck defined, or healthy, so we're good to go.
876                    HealthStatusEnum::EMPTY | HealthStatusEnum::NONE | HealthStatusEnum::HEALTHY => {
877                        debug!(
878                            driver_id = self.config.driver_id,
879                            "Container '{}' healthy or no healthcheck defined. Proceeding.", &self.container_name
880                        );
881                        return Ok(());
882                    }
883
884                    // Not healthy yet, so we'll keep waiting.
885                    HealthStatusEnum::STARTING => {
886                        debug!(
887                            driver_id = self.config.driver_id,
888                            "Container '{}' not yet healthy. Waiting...", &self.container_name
889                        );
890                    }
891
892                    HealthStatusEnum::UNHEALTHY => {
893                        return Err(generic_error!(
894                            "Container became unhealthy (driver_id: {}, container: {}). Check logs in the test run directory.",
895                            self.config.driver_id,
896                            self.container_name
897                        ));
898                    }
899                }
900            } else {
901                debug!(
902                    driver_id = self.config.driver_id,
903                    "Container '{}' has no healthcheck defined. Proceeding.", &self.container_name
904                );
905                return Ok(());
906            }
907
908            // Wait for a second and then check again.
909            sleep(Duration::from_secs(1)).await;
910        }
911    }
912
913    async fn wait_for_container_exit_inner(&self, container_name: &str) -> Result<ExitStatus, GenericError> {
914        let mut wait_stream = self.docker.wait_container(container_name, None);
915        match wait_stream.next().await {
916            Some(result) => match result {
917                Ok(response) => {
918                    // When the exit code is non-zero, `bollard` transforms the normal `ContainerWaitResponse` into
919                    // `Error::DockerContainerWaitError`, which is why we have these asserts here to catch any scenario
920                    // where there's _somehow_ an error condition being indicated without it having been transformed into
921                    // `Error::DockerContainerWaitError`.
922                    //
923                    // Essentially, getting to this point should imply successfully exiting, but the API isn't very
924                    // ergonomic in that regard, so we're just making sure.
925                    assert_eq!(response.error, None);
926                    assert_eq!(response.status_code, 0);
927
928                    Ok(ExitStatus::Success)
929                }
930
931                Err(Error::DockerContainerWaitError { error, code }) => {
932                    let error = if error.is_empty() {
933                        String::from("<no error message provided>")
934                    } else {
935                        error
936                    };
937                    Ok(ExitStatus::Failed { code, error })
938                }
939
940                Err(e) => Err(generic_error!("Failed to wait for container to finish: {:?}", e)),
941            },
942            None => unreachable!("Docker wait stream ended unexpectedly."),
943        }
944    }
945
946    /// Waits for the container to finish successfully.
947    ///
948    /// The container's exit status is returned, indicating success (exit code 0) or failure (exit code != 0), including
949    /// any error message related to the failure.
950    ///
951    /// # Errors
952    ///
953    /// If an error is encountered while waiting for the container to exit, it will be returned.
954    pub async fn wait_for_container_exit(&self) -> Result<ExitStatus, GenericError> {
955        debug!(
956            driver_id = self.config.driver_id,
957            isolation_group = self.isolation_group_id,
958            "Waiting for container '{}' to finish...",
959            &self.container_name
960        );
961
962        let exit_status = self.wait_for_container_exit_inner(&self.container_name).await?;
963
964        debug!(
965            driver_id = self.config.driver_id,
966            isolation_group = self.isolation_group_id,
967            "Container '{}' finished successfully.",
968            &self.container_name
969        );
970
971        Ok(exit_status)
972    }
973
974    /// Executes a command inside the running container and returns its stdout.
975    ///
976    /// The command runs as root with no TTY. Stderr is discarded, and only stdout is returned. If the command exits with a
977    /// nonzero status, an error is returned.
978    ///
979    /// # Errors
980    ///
981    /// If the exec creation, start, output collection, or command exit code indicates failure, an error is returned.
982    pub async fn exec_in_container(&self, cmd: Vec<String>) -> Result<String, GenericError> {
983        let exec_opts = CreateExecOptions {
984            attach_stdout: Some(true),
985            attach_stderr: Some(false),
986            cmd: Some(cmd.clone()),
987            ..Default::default()
988        };
989
990        let exec = self
991            .docker
992            .create_exec(&self.container_name, exec_opts)
993            .await
994            .with_error_context(|| format!("Failed to create exec instance for container {}.", self.container_name))?;
995
996        let exec_id = exec.id.clone();
997
998        let output = self
999            .docker
1000            .start_exec(&exec.id, None)
1001            .await
1002            .with_error_context(|| format!("Failed to start exec for container {}.", self.container_name))?;
1003
1004        let mut stdout = String::new();
1005        if let StartExecResults::Attached { mut output, .. } = output {
1006            while let Some(chunk) = output.try_next().await? {
1007                if let LogOutput::StdOut { message } = chunk {
1008                    stdout.push_str(&String::from_utf8_lossy(&message));
1009                }
1010            }
1011        }
1012
1013        // Check the command's exit code.
1014        let inspect = self
1015            .docker
1016            .inspect_exec(&exec_id)
1017            .await
1018            .error_context("Failed to inspect exec result.")?;
1019
1020        if let Some(code) = inspect.exit_code {
1021            if code != 0 {
1022                return Err(generic_error!(
1023                    "Command {:?} exited with code {} in container {}.",
1024                    cmd,
1025                    code,
1026                    self.container_name
1027                ));
1028            }
1029        }
1030
1031        Ok(stdout)
1032    }
1033
1034    async fn cleanup_inner(&self, container_name: &str) -> Result<(), GenericError> {
1035        self.docker.stop_container(container_name, None).await?;
1036        self.docker.remove_container(container_name, None).await?;
1037
1038        Ok(())
1039    }
1040
1041    /// Cleans up the container, stopping and removing it from the system.
1042    ///
1043    /// # Errors
1044    ///
1045    /// If there is an error while stopping or removing the container, it will be returned.
1046    pub async fn cleanup(self) -> Result<(), GenericError> {
1047        debug!(
1048            driver_id = self.config.driver_id,
1049            isolation_group = self.isolation_group_id,
1050            "Cleaning up container '{}'...",
1051            self.container_name
1052        );
1053
1054        let start = Instant::now();
1055
1056        self.cleanup_inner(&self.container_name).await?;
1057
1058        debug!(
1059            driver_id = self.config.driver_id,
1060            isolation_group = self.isolation_group_id,
1061            "Container '{}' removed after {:?}.",
1062            self.container_name,
1063            start.elapsed()
1064        );
1065
1066        Ok(())
1067    }
1068
1069    async fn capture_container_logs(
1070        &self, container_log_dir: PathBuf, log_name: &str, container_name: &str,
1071    ) -> Result<(), GenericError> {
1072        // Make sure the directories exist first and prepare the files, just to get any permissions issues out of the
1073        // way up front before we spawn our background task.
1074        tokio::fs::create_dir_all(&container_log_dir)
1075            .await
1076            .error_context("Failed to create logs directory. Possible permissions issue.")?;
1077
1078        let stdout_log_path = container_log_dir.join(format!("{}.stdout.log", log_name));
1079        let stderr_log_path = container_log_dir.join(format!("{}.stderr.log", log_name));
1080
1081        let mut stdout_file = tokio::fs::File::create(&stdout_log_path)
1082            .await
1083            .map(BufWriter::new)
1084            .error_context("Failed to create standard output log file. Possible permissions issue.")?;
1085        let mut stderr_file = tokio::fs::File::create(&stderr_log_path)
1086            .await
1087            .map(BufWriter::new)
1088            .error_context("Failed to create standard error log file. Possible permissions issue.")?;
1089
1090        // Spawn a background task to capture the logs.
1091        let logs_config = LogsOptions {
1092            follow: true,
1093            stdout: true,
1094            stderr: true,
1095            ..Default::default()
1096        };
1097        let mut log_stream = self.docker.logs(container_name, Some(logs_config));
1098
1099        tokio::spawn(async move {
1100            while let Some(log_result) = log_stream.next().await {
1101                match log_result {
1102                    Ok(log) => match log {
1103                        LogOutput::StdErr { message } => {
1104                            if let Err(e) = stderr_file.write_all(&strip_ansi_codes(&message)).await {
1105                                error!(error = %e, "Failed to write log line to standard error log file.");
1106                                break;
1107                            }
1108                            if let Err(e) = stderr_file.flush().await {
1109                                error!(error = %e, "Failed to flush standard error log file.");
1110                                break;
1111                            }
1112                        }
1113                        LogOutput::StdOut { message } => {
1114                            if let Err(e) = stdout_file.write_all(&strip_ansi_codes(&message)).await {
1115                                error!(error = %e, "Failed to write log line to standard output log file.");
1116                                break;
1117                            }
1118                            if let Err(e) = stdout_file.flush().await {
1119                                error!(error = %e, "Failed to flush standard output log file.");
1120                                break;
1121                            }
1122                        }
1123                        LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
1124                    },
1125                    Err(e) => {
1126                        error!(error = %e, "Failed to read log line from container.");
1127                        break;
1128                    }
1129                }
1130            }
1131
1132            // One final fsync to ensure the logs are fully written to disk.
1133            if let Err(e) = stdout_file.get_mut().sync_all().await {
1134                error!(error = %e, "Failed to fsync standard output log file.");
1135            }
1136
1137            if let Err(e) = stderr_file.get_mut().sync_all().await {
1138                error!(error = %e, "Failed to fsync standard error log file.");
1139            }
1140        });
1141
1142        Ok(())
1143    }
1144}
1145
1146/// Removes ANSI escape sequences (`ESC[...letter`) from a byte slice.
1147fn strip_ansi_codes(input: &[u8]) -> Vec<u8> {
1148    let mut out = Vec::with_capacity(input.len());
1149    let mut i = 0;
1150    while i < input.len() {
1151        if input[i] == 0x1b && input.get(i + 1) == Some(&b'[') {
1152            i += 2;
1153            while i < input.len() && !input[i].is_ascii_alphabetic() {
1154                i += 1;
1155            }
1156            i += 1;
1157        } else {
1158            out.push(input[i]);
1159            i += 1;
1160        }
1161    }
1162    out
1163}
1164
1165fn get_alpine_container_image() -> String {
1166    // Normally, we would just use `alpine:latest` and let Docker figure out the registry to pull it from (that is, Docker
1167    // Hub) but in CI, we don't have Docker Hub available to us, so we need to use an internal registry.
1168    //
1169    // Rather than threading through this information from the top level, we simply look for an override environment
1170    // variable here.. which lets us specify the right image reference to use in CI, while allowing normal users to just
1171    // grab it from Docker Hub when running locally.
1172    std::env::var("PANORAMIC_ALPINE_IMAGE").unwrap_or_else(|_| "alpine:latest".to_string())
1173}
1174
1175fn get_default_airlock_labels(isolation_group_id: &str) -> HashMap<String, String> {
1176    let mut labels = HashMap::new();
1177    labels.insert("created_by".to_string(), "airlock".to_string());
1178    labels.insert("airlock-isolation-group".to_string(), isolation_group_id.to_string());
1179    labels
1180}