1use std::{
2 collections::HashMap,
3 fmt,
4 path::{Path, PathBuf},
5 time::{Duration, Instant},
6};
7
8use bollard::{
9 container::{LogOutput, NetworkingConfig as ContainerNetworkingConfig},
10 errors::Error,
11 exec::{CreateExecOptions, StartExecResults},
12 models::{
13 ContainerCreateBody, ContainerStateStatusEnum, EndpointSettings, HealthConfig, HealthStatusEnum, HostConfig,
14 Ipam, NetworkConnectRequest, NetworkCreateRequest, VolumeCreateRequest,
15 },
16 query_parameters::{CreateContainerOptionsBuilder, CreateImageOptions, ListContainersOptionsBuilder, LogsOptions},
17 Docker,
18};
19use futures::{StreamExt as _, TryStreamExt as _};
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use tokio::{
22 io::{AsyncWriteExt as _, BufWriter},
23 time::sleep,
24};
25use tracing::{debug, error, trace};
26
27use crate::config::{DatadogIntakeConfig, MillstoneConfig, TargetConfig};
28
29const MILLSTONE_CONFIG_PATH_INTERNAL: &str = "/etc/millstone/config.toml";
30const DATADOG_INTAKE_HEALTHCHECK_INTERVAL: Duration = Duration::from_secs(1);
31const DATADOG_INTAKE_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(1);
32const DATADOG_INTAKE_HEALTHCHECK_RETRIES: i64 = 30;
33const DATADOG_INTAKE_HEALTHCHECK_START_PERIOD: Duration = Duration::from_secs(1);
34const DATADOG_INTAKE_HEALTHCHECK_START_INTERVAL: Duration = Duration::from_secs(1);
35const DATADOG_INTAKE_HEALTHCHECK_COMMAND: &str = concat!(
36 "exec 3<>/dev/tcp/127.0.0.1/2049 && ",
37 "printf 'GET /ready HTTP/1.1\\r\\nHost: localhost\\r\\nConnection: close\\r\\n\\r\\n' >&3 && ",
38 "grep -q '200 OK' <&3"
39);
40
41pub enum ExitStatus {
42 Success,
43 Failed { code: i64, error: String },
44}
45
46impl fmt::Display for ExitStatus {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 match self {
49 ExitStatus::Success => write!(f, "success (0)"),
50 ExitStatus::Failed { code, error } => write!(f, "failed (exit code: {}, error: {})", code, error),
51 }
52 }
53}
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub enum ContainerOs {
58 Linux,
60 Windows,
62}
63
64#[derive(Clone)]
68pub struct DriverConfig {
69 driver_id: &'static str,
70 image: String,
71 entrypoint: Option<Vec<String>>,
72 command: Option<Vec<String>>,
73 env: Vec<String>,
74 binds: Vec<String>,
75 healthcheck: Option<HealthConfig>,
76 exposed_ports: Vec<(&'static str, u16)>,
77 container_os: ContainerOs,
78 additional_volume_mounts: Vec<String>,
85
86 network_aliases: Vec<String>,
93
94 additional_networks: Vec<String>,
101}
102
103impl DriverConfig {
104 pub async fn millstone(config: MillstoneConfig) -> Result<Self, GenericError> {
105 match tokio::fs::metadata(&config.config_path).await {
107 Ok(metadata) if metadata.is_file() => {}
108 Ok(_) => {
109 return Err(generic_error!(
110 "Specified millstone configuration path ({}) does not point to a file.",
111 config.config_path.display()
112 ))
113 }
114 Err(e) => {
115 return Err(generic_error!(
116 "Failed to ensure specified millstone configuration ({}) exists locally: {}",
117 config.config_path.display(),
118 e
119 ))
120 }
121 }
122
123 let millstone_binary_path = config
124 .binary_path
125 .unwrap_or_else(|| "/usr/local/bin/millstone".to_string());
126 let entrypoint = vec![millstone_binary_path, MILLSTONE_CONFIG_PATH_INTERNAL.to_string()];
127
128 let driver_config = Self::from_image("millstone", config.image)
129 .with_entrypoint(entrypoint)
130 .with_bind_mount(config.config_path, MILLSTONE_CONFIG_PATH_INTERNAL);
131
132 Ok(driver_config)
133 }
134
135 pub async fn datadog_intake(config: DatadogIntakeConfig) -> Result<Self, GenericError> {
136 let datadog_intake_binary_path = config
137 .binary_path
138 .unwrap_or_else(|| "/usr/local/bin/datadog-intake".to_string());
139 let entrypoint = vec![datadog_intake_binary_path];
140
141 let driver_config = DriverConfig::from_image("datadog-intake", config.image)
142 .with_entrypoint(entrypoint)
143 .with_healthcheck(
144 vec![
145 "/bin/bash".to_string(),
146 "-c".to_string(),
147 DATADOG_INTAKE_HEALTHCHECK_COMMAND.to_string(),
148 ],
149 DATADOG_INTAKE_HEALTHCHECK_INTERVAL,
150 DATADOG_INTAKE_HEALTHCHECK_TIMEOUT,
151 DATADOG_INTAKE_HEALTHCHECK_RETRIES,
152 DATADOG_INTAKE_HEALTHCHECK_START_PERIOD,
153 DATADOG_INTAKE_HEALTHCHECK_START_INTERVAL,
154 )
155 .with_exposed_port("tcp", 2049);
158
159 Ok(driver_config)
160 }
161
162 pub async fn target(target_id: &'static str, config: TargetConfig) -> Result<Self, GenericError> {
163 let driver_config = DriverConfig::from_image(target_id, config.image)
164 .with_entrypoint(config.entrypoint)
165 .with_command(config.command)
166 .with_env_vars(config.additional_env_vars)
167 .with_container_os(config.container_os);
168
169 Ok(driver_config)
170 }
171
172 pub fn from_image(driver_id: &'static str, image: String) -> Self {
174 Self {
175 driver_id,
176 image,
177 entrypoint: None,
178 command: None,
179 env: vec![],
180 binds: vec![],
181 healthcheck: None,
182 exposed_ports: vec![],
183 container_os: ContainerOs::Linux,
184 additional_volume_mounts: vec![],
185 network_aliases: vec![],
186 additional_networks: vec![],
187 }
188 }
189
190 pub fn with_entrypoint(mut self, entrypoint: Vec<String>) -> Self {
194 if !entrypoint.is_empty() {
195 self.entrypoint = Some(entrypoint);
196 }
197 self
198 }
199
200 pub fn with_command(mut self, command: Vec<String>) -> Self {
204 if !command.is_empty() {
205 self.command = Some(command);
206 }
207 self
208 }
209
210 pub fn with_env_var<K, V>(mut self, key: K, value: V) -> Self
212 where
213 K: AsRef<str>,
214 V: AsRef<str>,
215 {
216 self.env.push(format!("{}={}", key.as_ref(), value.as_ref()));
217 self
218 }
219
220 pub fn with_env_vars(mut self, env: Vec<String>) -> Self {
222 self.env.extend(env);
223 self
224 }
225
226 pub fn with_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
231 where
232 HP: AsRef<Path>,
233 CP: AsRef<Path>,
234 {
235 let bind_mount = format!("{}:{}", host_path.as_ref().display(), container_path.as_ref().display());
236 self.binds.push(bind_mount);
237 self
238 }
239
240 pub fn with_readonly_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
244 where
245 HP: AsRef<Path>,
246 CP: AsRef<Path>,
247 {
248 let bind_mount = format!(
249 "{}:{}:ro",
250 host_path.as_ref().display(),
251 container_path.as_ref().display()
252 );
253 self.binds.push(bind_mount);
254 self
255 }
256
257 pub fn with_healthcheck(
259 mut self, mut test_command: Vec<String>, interval: Duration, timeout: Duration, retries: i64,
260 start_period: Duration, start_interval: Duration,
261 ) -> Self {
262 test_command.insert(0, "CMD".to_string());
265
266 self.healthcheck = Some(HealthConfig {
267 test: Some(test_command),
268 interval: Some(interval.as_nanos() as i64),
269 timeout: Some(timeout.as_nanos() as i64),
270 retries: Some(retries),
271 start_period: Some(start_period.as_nanos() as i64),
272 start_interval: Some(start_interval.as_nanos() as i64),
273 });
274 self
275 }
276
277 pub fn with_network_alias(mut self, alias: impl Into<String>) -> Self {
282 self.network_aliases.push(alias.into());
283 self
284 }
285
286 pub fn with_network(mut self, network: impl Into<String>) -> Self {
292 self.additional_networks.push(network.into());
293 self
294 }
295
296 pub fn with_volume_mount(mut self, volume_name: impl Into<String>, container_path: impl AsRef<Path>) -> Self {
304 self.additional_volume_mounts
305 .push(format!("{}:{}", volume_name.into(), container_path.as_ref().display()));
306 self
307 }
308
309 pub fn with_exposed_port(mut self, protocol: &'static str, internal_port: u16) -> Self {
318 self.exposed_ports.push((protocol, internal_port));
319 self
320 }
321
322 pub fn with_container_os(mut self, container_os: ContainerOs) -> Self {
329 self.container_os = container_os;
330 self
331 }
332
333 fn needs_shared_volume_permission_fixup(&self) -> bool {
342 self.container_os == ContainerOs::Linux
343 }
344
345 fn network_driver(&self) -> &'static str {
350 match self.container_os {
351 ContainerOs::Linux => "bridge",
352 ContainerOs::Windows => "nat",
353 }
354 }
355
356 fn container_binds_from(&self, isolation_group_name: &str, mut binds: Vec<String>) -> Vec<String> {
365 match self.container_os {
366 ContainerOs::Linux => {
367 binds.push(format!("{}:/airlock:z", isolation_group_name));
368 binds.push("/proc:/host/proc:ro".to_string());
369 binds.push("/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string());
370 binds.push("/var/run/docker.sock:/var/run/docker.sock:ro".to_string());
371 }
372 ContainerOs::Windows => {
373 binds.push(format!("{}:C:\\airlock", isolation_group_name));
374 }
375 }
376
377 binds.extend(self.additional_volume_mounts.clone());
378 binds
379 }
380}
381
382#[derive(Debug, Default)]
384pub struct DriverDetails {
385 container_name: String,
386 container_ip: Option<String>,
387 port_mappings: Option<HashMap<String, u16>>,
388}
389
390fn insert_port_mapping_if_parseable(
396 port_mappings: &mut HashMap<String, u16>, internal_port: impl Into<String>, host_port: Option<&str>,
397) {
398 if let Some(host_port) = host_port.and_then(|value| value.parse::<u16>().ok()) {
399 port_mappings.insert(internal_port.into(), host_port);
400 }
401}
402
403impl DriverDetails {
404 pub fn container_name(&self) -> &str {
406 &self.container_name
407 }
408
409 pub fn container_ip(&self) -> Option<&str> {
411 self.container_ip.as_deref()
412 }
413
414 pub fn try_get_exposed_port(&self, protocol: &str, internal_port: u16) -> Option<u16> {
420 self.port_mappings
421 .as_ref()
422 .and_then(|port_mappings| port_mappings.get(&format!("{}/{}", internal_port, protocol)).copied())
423 }
424}
425
426pub struct Driver {
428 isolation_group_id: String,
429 isolation_group_name: String,
430 container_name: String,
431 config: DriverConfig,
432 docker: Docker,
433 log_dir: Option<PathBuf>,
434}
435
436impl Driver {
437 pub fn from_config(isolation_group_id: String, config: DriverConfig) -> Result<Self, GenericError> {
457 let docker = crate::docker::connect()?;
458
459 Ok(Self {
460 isolation_group_name: format!("airlock-{}", isolation_group_id),
461 container_name: format!("airlock-{}-{}", isolation_group_id, config.driver_id),
462 isolation_group_id,
463 config,
464 docker,
465 log_dir: None,
466 })
467 }
468
469 pub fn with_logging(mut self, log_dir: PathBuf) -> Self {
475 self.log_dir = Some(log_dir);
476 self
477 }
478
479 pub fn driver_id(&self) -> &'static str {
483 self.config.driver_id
484 }
485
486 pub async fn clean_related_resources(isolation_group_id: String) -> Result<(), GenericError> {
495 let docker = crate::docker::connect()?;
496
497 let isolation_group_name = format!("airlock-{}", isolation_group_id);
498 let isolation_group_label = format!("airlock-isolation-group={}", isolation_group_id);
499
500 let list_filters: HashMap<&str, Vec<&str>> =
502 [("label", vec!["created_by=airlock", isolation_group_label.as_str()])]
503 .into_iter()
504 .collect();
505 let list_options = Some(
506 ListContainersOptionsBuilder::default()
507 .all(true)
508 .filters(&list_filters)
509 .build(),
510 );
511 let containers = docker.list_containers(list_options).await.with_error_context(|| {
512 format!(
513 "Failed to list containers attached to isolation group '{}'.",
514 isolation_group_id
515 )
516 })?;
517
518 for container in containers {
519 let container_name = match container.id {
520 Some(id) => id,
521 None => {
522 debug!("Listed container had no ID. Skipping removal.");
523 continue;
524 }
525 };
526
527 if let Err(e) = docker.stop_container(container_name.as_str(), None).await {
528 error!(error = %e, "Failed to stop container '{}'.", container_name);
529 continue;
530 } else {
531 debug!("Stopped container '{}'.", container_name);
532 }
533
534 if let Err(e) = docker.remove_container(container_name.as_str(), None).await {
535 error!(error = %e, "Failed to remove container '{}'.", container_name);
536 continue;
537 } else {
538 debug!("Removed container '{}'.", container_name);
539 }
540 }
541
542 if let Err(e) = docker
544 .remove_volume(
545 isolation_group_name.as_str(),
546 None::<bollard::query_parameters::RemoveVolumeOptions>,
547 )
548 .await
549 {
550 error!(error = %e, "Failed to remove shared volume '{}'.", isolation_group_name);
551 } else {
552 debug!("Removed shared volume '{}'.", isolation_group_name);
553 }
554
555 if let Err(e) = docker.remove_network(isolation_group_name.as_str()).await {
557 error!(error = %e, "Failed to remove shared network '{}'.", isolation_group_name);
558 } else {
559 debug!("Removed shared network '{}'.", isolation_group_name);
560 }
561
562 Ok(())
563 }
564
565 async fn create_network_if_missing(&self) -> Result<(), GenericError> {
566 let networks = self.docker.list_networks(None).await?;
568 if networks
569 .iter()
570 .any(|network| network.name.as_deref() == Some(self.isolation_group_name.as_str()))
571 {
572 debug!("Network '{}' already exists.", self.isolation_group_name);
573 return Ok(());
574 }
575
576 debug!(
577 driver_id = self.config.driver_id,
578 isolation_group = self.isolation_group_id,
579 "Network '{}' does not exist. Creating...",
580 self.isolation_group_name
581 );
582
583 let network_options = NetworkCreateRequest {
585 name: self.isolation_group_name.clone(),
586 driver: Some(self.config.network_driver().to_string()),
587 ipam: Some(Ipam::default()),
588 enable_ipv6: Some(false),
589 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
590 ..Default::default()
591 };
592 let response = self.docker.create_network(network_options).await?;
593 debug!(
594 driver_id = self.config.driver_id,
595 isolation_group = self.isolation_group_id,
596 "Created network '{}' (ID: {:?}).",
597 self.isolation_group_name,
598 response.id
599 );
600
601 Ok(())
602 }
603
604 async fn create_image_if_missing_inner(&self, image: &str) -> Result<(), GenericError> {
605 let image_options = CreateImageOptions {
606 from_image: Some(image.to_string()),
607 ..Default::default()
608 };
609
610 let mut create_stream = self.docker.create_image(Some(image_options), None, None);
611 while let Some(info) = create_stream.next().await {
612 trace!(
613 driver_id = self.config.driver_id,
614 isolation_group = self.isolation_group_id,
615 image,
616 "Received image pull update: {:?}",
617 info
618 );
619 }
620
621 Ok(())
622 }
623
624 async fn create_image_if_missing(&self) -> Result<(), GenericError> {
625 debug!(
626 driver_id = self.config.driver_id,
627 isolation_group = self.isolation_group_id,
628 "Pulling image '{}'...",
629 self.config.image
630 );
631
632 self.create_image_if_missing_inner(self.config.image.as_str()).await?;
633
634 debug!(
635 driver_id = self.config.driver_id,
636 isolation_group = self.isolation_group_id,
637 "Pulled image '{}'.",
638 self.config.image
639 );
640
641 Ok(())
642 }
643
644 async fn create_volume_if_missing(&self) -> Result<(), GenericError> {
645 let volumes = self
647 .docker
648 .list_volumes(None::<bollard::query_parameters::ListVolumesOptions>)
649 .await?;
650 if volumes
651 .volumes
652 .iter()
653 .flatten()
654 .any(|volume| volume.name == self.isolation_group_name.as_str())
655 {
656 debug!("Shared volume '{}' already exists.", self.isolation_group_name);
657 return Ok(());
658 }
659
660 debug!(
661 driver_id = self.config.driver_id,
662 isolation_group = self.isolation_group_id,
663 "Shared volume '{}' does not exist. Creating...",
664 self.isolation_group_name
665 );
666
667 let volume_options = VolumeCreateRequest {
668 name: Some(self.isolation_group_name.clone()),
669 driver: Some("local".to_string()),
670 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
671 ..Default::default()
672 };
673 self.docker.create_volume(volume_options).await?;
674
675 debug!(
676 driver_id = self.config.driver_id,
677 isolation_group = self.isolation_group_id,
678 "Created shared volume '{}'.",
679 self.isolation_group_name
680 );
681
682 Ok(())
683 }
684
685 async fn adjust_shared_volume_permissions(&self) -> Result<(), GenericError> {
686 debug!(
687 driver_id = self.config.driver_id,
688 isolation_group = self.isolation_group_id,
689 "Adjusting permissions on shared volume '{}'...",
690 self.container_name
691 );
692
693 let image = get_alpine_container_image();
695 self.create_image_if_missing_inner(&image).await?;
696
697 let container_name = format!("airlock-{}-volume-fix-up", self.isolation_group_id);
698 let entrypoint = vec![
699 "chmod".to_string(),
700 "-R".to_string(),
701 "777".to_string(),
702 "/airlock".to_string(),
703 ];
704 let _ = self
705 .create_container_inner(container_name.clone(), image, Some(entrypoint), None, vec![], None)
706 .await?;
707
708 self.start_container_inner(&container_name).await?;
709 self.wait_for_container_exit_inner(&container_name).await?;
710 self.cleanup_inner(&container_name).await?;
711
712 Ok(())
713 }
714
715 async fn create_container_inner(
716 &self, container_name: String, image: String, entrypoint: Option<Vec<String>>, cmd: Option<Vec<String>>,
717 binds: Vec<String>, env: Option<Vec<String>>,
718 ) -> Result<String, GenericError> {
719 let binds = self.config.container_binds_from(&self.isolation_group_name, binds);
720
721 let networking_config = if !self.config.network_aliases.is_empty() {
723 let mut endpoints = HashMap::new();
724 endpoints.insert(
725 self.isolation_group_name.clone(),
726 EndpointSettings {
727 aliases: Some(self.config.network_aliases.clone()),
728 ..Default::default()
729 },
730 );
731 Some(
732 ContainerNetworkingConfig {
733 endpoints_config: endpoints,
734 }
735 .into(),
736 )
737 } else {
738 None
739 };
740
741 let (publish_all_ports, exposed_ports) = if self.config.exposed_ports.is_empty() {
742 (None, None)
743 } else {
744 let exposed_ports: Vec<String> = self
745 .config
746 .exposed_ports
747 .iter()
748 .map(|(protocol, internal_port)| format!("{}/{}", internal_port, protocol))
749 .collect();
750 (Some(true), Some(exposed_ports))
751 };
752
753 let pid_mode = match self.config.container_os {
758 ContainerOs::Linux => Some("host".to_string()),
759 ContainerOs::Windows => None,
760 };
761
762 let container_config = ContainerCreateBody {
763 hostname: Some(self.config.driver_id.to_string()),
764 env,
765 image: Some(image),
766 entrypoint,
767 cmd,
768 host_config: Some(HostConfig {
769 binds: Some(binds),
770 network_mode: Some(self.isolation_group_name.clone()),
771 publish_all_ports,
772 pid_mode,
773 ..Default::default()
774 }),
775 healthcheck: self.config.healthcheck.clone(),
776 exposed_ports,
777 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
778 networking_config,
779 ..Default::default()
780 };
781
782 let create_options = CreateContainerOptionsBuilder::default().name(&container_name).build();
783
784 let response = self
785 .docker
786 .create_container(Some(create_options), container_config)
787 .await?;
788
789 Ok(response.id)
790 }
791
792 async fn create_container(&self) -> Result<(), GenericError> {
793 debug!(
794 driver_id = self.config.driver_id,
795 isolation_group = self.isolation_group_id,
796 "Creating container '{}'...",
797 self.container_name
798 );
799
800 let container_id = self
801 .create_container_inner(
802 self.container_name.clone(),
803 self.config.image.clone(),
804 self.config.entrypoint.clone(),
805 self.config.command.clone(),
806 self.config.binds.clone(),
807 Some(self.config.env.clone()),
808 )
809 .await?;
810
811 debug!(
812 driver_id = self.config.driver_id,
813 isolation_group = self.isolation_group_id,
814 "Created container '{}' (ID: {}).",
815 self.container_name,
816 container_id
817 );
818
819 Ok(())
820 }
821
822 async fn start_container_inner(&self, container_name: &str) -> Result<DriverDetails, GenericError> {
823 self.docker.start_container(container_name, None).await?;
824
825 let mut details = DriverDetails {
826 container_name: container_name.to_string(),
827 ..Default::default()
828 };
829
830 let response = self.docker.inspect_container(container_name, None).await?;
831 if let Some(network_settings) = response.network_settings {
832 if let Some(networks) = network_settings.networks.as_ref() {
836 details.container_ip = networks
837 .get(&self.isolation_group_name)
838 .and_then(|settings| settings.ip_address.clone())
839 .filter(|address| !address.is_empty());
840 }
841
842 if let Some(ports) = network_settings.ports {
843 let port_mappings = details.port_mappings.get_or_insert_with(HashMap::new);
844 for (internal_port, bindings) in ports {
845 if let Some(bindings) = bindings {
846 for binding in bindings {
847 insert_port_mapping_if_parseable(
848 port_mappings,
849 internal_port.clone(),
850 binding.host_port.as_deref(),
851 );
852 if port_mappings.contains_key(&internal_port) {
853 break;
854 }
855 }
856 }
857 }
858 }
859 }
860
861 Ok(details)
862 }
863
864 async fn start_container(&self) -> Result<DriverDetails, GenericError> {
865 debug!(
866 driver_id = self.config.driver_id,
867 isolation_group = self.isolation_group_id,
868 "Starting container '{}'...",
869 self.container_name
870 );
871
872 let details = self.start_container_inner(&self.container_name).await?;
873
874 if let Some(log_dir) = self.log_dir.clone() {
875 debug!(
876 "Capturing logs for container '{}' to {}...",
877 self.container_name,
878 log_dir.display()
879 );
880
881 self.capture_container_logs(log_dir, self.config.driver_id, &self.container_name)
882 .await?;
883 }
884
885 debug!(
886 driver_id = self.config.driver_id,
887 isolation_group = self.isolation_group_id,
888 "Started container '{}'.",
889 self.container_name
890 );
891
892 Ok(details)
893 }
894
895 async fn connect_to_additional_networks(&self) -> Result<(), GenericError> {
900 for network in &self.config.additional_networks {
901 self.docker
902 .connect_network(
903 network,
904 NetworkConnectRequest {
905 container: self.container_name.clone(),
906 endpoint_config: None,
907 },
908 )
909 .await
910 .map_err(|e| {
911 generic_error!(
912 "Failed to connect container '{}' to network '{}': {}",
913 self.container_name,
914 network,
915 e
916 )
917 })?;
918 }
919 Ok(())
920 }
921
922 pub async fn start(&mut self) -> Result<DriverDetails, GenericError> {
929 self.create_network_if_missing().await?;
930 self.create_image_if_missing().await?;
931 self.create_volume_if_missing().await?;
932 if self.config.needs_shared_volume_permission_fixup() {
933 self.adjust_shared_volume_permissions().await?;
934 }
935
936 self.create_container().await?;
937 self.connect_to_additional_networks().await?;
938 self.start_container().await
939 }
940
941 pub async fn wait_for_container_healthy(&mut self) -> Result<(), GenericError> {
949 loop {
950 let response = self.docker.inspect_container(&self.container_name, None).await?;
952 let state = response
953 .state
954 .ok_or_else(|| generic_error!("Container state should be present."))?;
955
956 let status = state
958 .status
959 .ok_or_else(|| generic_error!("Container status should be present."))?;
960 if status != ContainerStateStatusEnum::RUNNING {
961 return Err(generic_error!(
962 "Container exited unexpectedly (driver_id: {}, container: {}). Check logs in the test run directory.",
963 self.config.driver_id,
964 self.container_name
965 ));
966 }
967
968 if let Some(health_status) = state.health.and_then(|h| h.status) {
969 match health_status {
970 HealthStatusEnum::EMPTY | HealthStatusEnum::NONE | HealthStatusEnum::HEALTHY => {
972 debug!(
973 driver_id = self.config.driver_id,
974 "Container '{}' healthy or no healthcheck defined. Proceeding.", &self.container_name
975 );
976 return Ok(());
977 }
978
979 HealthStatusEnum::STARTING => {
981 debug!(
982 driver_id = self.config.driver_id,
983 "Container '{}' not yet healthy. Waiting...", &self.container_name
984 );
985 }
986
987 HealthStatusEnum::UNHEALTHY => {
988 return Err(generic_error!(
989 "Container became unhealthy (driver_id: {}, container: {}). Check logs in the test run directory.",
990 self.config.driver_id,
991 self.container_name
992 ));
993 }
994 }
995 } else {
996 debug!(
997 driver_id = self.config.driver_id,
998 "Container '{}' has no healthcheck defined. Proceeding.", &self.container_name
999 );
1000 return Ok(());
1001 }
1002
1003 sleep(Duration::from_secs(1)).await;
1005 }
1006 }
1007
1008 async fn wait_for_container_exit_inner(&self, container_name: &str) -> Result<ExitStatus, GenericError> {
1009 let mut wait_stream = self.docker.wait_container(container_name, None);
1010 match wait_stream.next().await {
1011 Some(result) => match result {
1012 Ok(response) => {
1013 assert_eq!(response.error, None);
1021 assert_eq!(response.status_code, 0);
1022
1023 Ok(ExitStatus::Success)
1024 }
1025
1026 Err(Error::DockerContainerWaitError { error, code }) => {
1027 let error = if error.is_empty() {
1028 String::from("<no error message provided>")
1029 } else {
1030 error
1031 };
1032 Ok(ExitStatus::Failed { code, error })
1033 }
1034
1035 Err(e) => Err(generic_error!("Failed to wait for container to finish: {:?}", e)),
1036 },
1037 None => unreachable!("Docker wait stream ended unexpectedly."),
1038 }
1039 }
1040
1041 pub async fn wait_for_container_exit(&self) -> Result<ExitStatus, GenericError> {
1050 debug!(
1051 driver_id = self.config.driver_id,
1052 isolation_group = self.isolation_group_id,
1053 "Waiting for container '{}' to finish...",
1054 &self.container_name
1055 );
1056
1057 let exit_status = self.wait_for_container_exit_inner(&self.container_name).await?;
1058
1059 debug!(
1060 driver_id = self.config.driver_id,
1061 isolation_group = self.isolation_group_id,
1062 "Container '{}' finished successfully.",
1063 &self.container_name
1064 );
1065
1066 Ok(exit_status)
1067 }
1068
1069 pub async fn exec_in_container(&self, cmd: Vec<String>) -> Result<String, GenericError> {
1078 let exec_opts = CreateExecOptions {
1079 attach_stdout: Some(true),
1080 attach_stderr: Some(false),
1081 cmd: Some(cmd.clone()),
1082 ..Default::default()
1083 };
1084
1085 let exec = self
1086 .docker
1087 .create_exec(&self.container_name, exec_opts)
1088 .await
1089 .with_error_context(|| format!("Failed to create exec instance for container {}.", self.container_name))?;
1090
1091 let exec_id = exec.id.clone();
1092
1093 let output = self
1094 .docker
1095 .start_exec(&exec.id, None)
1096 .await
1097 .with_error_context(|| format!("Failed to start exec for container {}.", self.container_name))?;
1098
1099 let mut stdout = String::new();
1100 if let StartExecResults::Attached { mut output, .. } = output {
1101 while let Some(chunk) = output.try_next().await? {
1102 if let LogOutput::StdOut { message } = chunk {
1103 stdout.push_str(&String::from_utf8_lossy(&message));
1104 }
1105 }
1106 }
1107
1108 let inspect = self
1110 .docker
1111 .inspect_exec(&exec_id)
1112 .await
1113 .error_context("Failed to inspect exec result.")?;
1114
1115 if let Some(code) = inspect.exit_code {
1116 if code != 0 {
1117 return Err(generic_error!(
1118 "Command {:?} exited with code {} in container {}.",
1119 cmd,
1120 code,
1121 self.container_name
1122 ));
1123 }
1124 }
1125
1126 Ok(stdout)
1127 }
1128
1129 async fn cleanup_inner(&self, container_name: &str) -> Result<(), GenericError> {
1130 self.docker.stop_container(container_name, None).await?;
1131 self.docker.remove_container(container_name, None).await?;
1132
1133 Ok(())
1134 }
1135
1136 pub async fn cleanup(self) -> Result<(), GenericError> {
1142 debug!(
1143 driver_id = self.config.driver_id,
1144 isolation_group = self.isolation_group_id,
1145 "Cleaning up container '{}'...",
1146 self.container_name
1147 );
1148
1149 let start = Instant::now();
1150
1151 self.cleanup_inner(&self.container_name).await?;
1152
1153 debug!(
1154 driver_id = self.config.driver_id,
1155 isolation_group = self.isolation_group_id,
1156 "Container '{}' removed after {:?}.",
1157 self.container_name,
1158 start.elapsed()
1159 );
1160
1161 Ok(())
1162 }
1163
1164 async fn capture_container_logs(
1165 &self, container_log_dir: PathBuf, log_name: &str, container_name: &str,
1166 ) -> Result<(), GenericError> {
1167 tokio::fs::create_dir_all(&container_log_dir)
1170 .await
1171 .error_context("Failed to create logs directory. Possible permissions issue.")?;
1172
1173 let stdout_log_path = container_log_dir.join(format!("{}.stdout.log", log_name));
1174 let stderr_log_path = container_log_dir.join(format!("{}.stderr.log", log_name));
1175
1176 let mut stdout_file = tokio::fs::File::create(&stdout_log_path)
1177 .await
1178 .map(BufWriter::new)
1179 .error_context("Failed to create standard output log file. Possible permissions issue.")?;
1180 let mut stderr_file = tokio::fs::File::create(&stderr_log_path)
1181 .await
1182 .map(BufWriter::new)
1183 .error_context("Failed to create standard error log file. Possible permissions issue.")?;
1184
1185 let logs_config = LogsOptions {
1187 follow: true,
1188 stdout: true,
1189 stderr: true,
1190 ..Default::default()
1191 };
1192 let mut log_stream = self.docker.logs(container_name, Some(logs_config));
1193
1194 tokio::spawn(async move {
1195 while let Some(log_result) = log_stream.next().await {
1196 match log_result {
1197 Ok(log) => match log {
1198 LogOutput::StdErr { message } => {
1199 if let Err(e) = stderr_file.write_all(&strip_ansi_codes(&message)).await {
1200 error!(error = %e, "Failed to write log line to standard error log file.");
1201 break;
1202 }
1203 if let Err(e) = stderr_file.flush().await {
1204 error!(error = %e, "Failed to flush standard error log file.");
1205 break;
1206 }
1207 }
1208 LogOutput::StdOut { message } => {
1209 if let Err(e) = stdout_file.write_all(&strip_ansi_codes(&message)).await {
1210 error!(error = %e, "Failed to write log line to standard output log file.");
1211 break;
1212 }
1213 if let Err(e) = stdout_file.flush().await {
1214 error!(error = %e, "Failed to flush standard output log file.");
1215 break;
1216 }
1217 }
1218 LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
1219 },
1220 Err(e) => {
1221 error!(error = %e, "Failed to read log line from container.");
1222 break;
1223 }
1224 }
1225 }
1226
1227 if let Err(e) = stdout_file.get_mut().sync_all().await {
1229 error!(error = %e, "Failed to fsync standard output log file.");
1230 }
1231
1232 if let Err(e) = stderr_file.get_mut().sync_all().await {
1233 error!(error = %e, "Failed to fsync standard error log file.");
1234 }
1235 });
1236
1237 Ok(())
1238 }
1239}
1240
1241fn strip_ansi_codes(input: &[u8]) -> Vec<u8> {
1243 let mut out = Vec::with_capacity(input.len());
1244 let mut i = 0;
1245 while i < input.len() {
1246 if input[i] == 0x1b && input.get(i + 1) == Some(&b'[') {
1247 i += 2;
1248 while i < input.len() && !input[i].is_ascii_alphabetic() {
1249 i += 1;
1250 }
1251 i += 1;
1252 } else {
1253 out.push(input[i]);
1254 i += 1;
1255 }
1256 }
1257 out
1258}
1259
1260fn get_alpine_container_image() -> String {
1261 std::env::var("PANORAMIC_ALPINE_IMAGE").unwrap_or_else(|_| "alpine:latest".to_string())
1268}
1269
1270fn get_default_airlock_labels(isolation_group_id: &str) -> HashMap<String, String> {
1271 let mut labels = HashMap::new();
1272 labels.insert("created_by".to_string(), "airlock".to_string());
1273 labels.insert("airlock-isolation-group".to_string(), isolation_group_id.to_string());
1274 labels
1275}
1276
1277#[cfg(test)]
1278mod tests {
1279 use super::*;
1280
1281 #[test]
1282 fn default_linux_container_binds_include_airlock_and_linux_host_resources() {
1283 let config = DriverConfig::from_image("target", "example:latest".to_string());
1284
1285 let binds = config.container_binds_from("airlock-test", config.binds.clone());
1286
1287 assert!(binds.contains(&"airlock-test:/airlock:z".to_string()));
1288 assert!(binds.contains(&"/proc:/host/proc:ro".to_string()));
1289 assert!(binds.contains(&"/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string()));
1290 assert!(binds.contains(&"/var/run/docker.sock:/var/run/docker.sock:ro".to_string()));
1291 }
1292
1293 #[test]
1294 fn windows_container_binds_use_windows_airlock_and_skip_linux_host_resources() {
1295 let config =
1296 DriverConfig::from_image("target", "example:latest".to_string()).with_container_os(ContainerOs::Windows);
1297
1298 let binds = config.container_binds_from("airlock-test", config.binds.clone());
1299
1300 assert!(binds.contains(&"airlock-test:C:\\airlock".to_string()));
1301 assert!(!binds.iter().any(|bind| bind.contains("/proc")));
1302 assert!(!binds.iter().any(|bind| bind.contains("/sys/fs/cgroup")));
1303 assert!(!binds.iter().any(|bind| bind.contains("/var/run/docker.sock")));
1304 assert!(!binds.iter().any(|bind| bind.ends_with(":z")));
1305 }
1306
1307 #[tokio::test]
1308 async fn target_config_preserves_windows_container_os() {
1309 let target = TargetConfig {
1310 image: "example:latest".to_string(),
1311 entrypoint: vec![],
1312 command: vec![],
1313 additional_env_vars: vec![],
1314 container_os: ContainerOs::Windows,
1315 };
1316
1317 let config = DriverConfig::target("target", target).await.unwrap();
1318
1319 assert_eq!(config.container_os, ContainerOs::Windows);
1320 }
1321
1322 #[test]
1323 fn port_mapping_inserts_parseable_host_port() {
1324 let mut mappings = HashMap::new();
1325
1326 insert_port_mapping_if_parseable(&mut mappings, "55100/tcp", Some("49152"));
1327
1328 assert_eq!(mappings.get("55100/tcp"), Some(&49152));
1329 }
1330
1331 #[test]
1332 fn port_mapping_ignores_invalid_host_port() {
1333 let mut mappings = HashMap::new();
1334
1335 insert_port_mapping_if_parseable(&mut mappings, "55100/tcp", Some("not-a-port"));
1336
1337 assert!(!mappings.contains_key("55100/tcp"));
1338 }
1339
1340 #[test]
1341 fn windows_container_skips_shared_volume_permission_fixup() {
1342 let config =
1343 DriverConfig::from_image("target", "example:latest".to_string()).with_container_os(ContainerOs::Windows);
1344
1345 assert!(!config.needs_shared_volume_permission_fixup());
1346 }
1347
1348 #[test]
1349 fn windows_container_uses_nat_network_driver() {
1350 let config =
1351 DriverConfig::from_image("target", "example:latest".to_string()).with_container_os(ContainerOs::Windows);
1352
1353 assert_eq!(config.network_driver(), "nat");
1354 }
1355
1356 #[test]
1357 fn linux_container_uses_bridge_network_driver() {
1358 let config = DriverConfig::from_image("target", "example:latest".to_string());
1359
1360 assert_eq!(config.network_driver(), "bridge");
1361 }
1362}