1use std::{
2 collections::HashMap,
3 fmt,
4 path::{Path, PathBuf},
5 time::{Duration, Instant},
6};
7
8use bollard::{
9 container::{LogOutput, NetworkingConfig as ContainerNetworkingConfig},
10 errors::Error,
11 exec::{CreateExecOptions, StartExecResults},
12 models::{
13 ContainerCreateBody, ContainerStateStatusEnum, EndpointSettings, HealthConfig, HealthStatusEnum, HostConfig,
14 Ipam, NetworkConnectRequest, NetworkCreateRequest, VolumeCreateRequest,
15 },
16 query_parameters::{CreateContainerOptionsBuilder, CreateImageOptions, ListContainersOptionsBuilder, LogsOptions},
17 Docker,
18};
19use futures::{StreamExt as _, TryStreamExt as _};
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use tokio::{
22 io::{AsyncWriteExt as _, BufWriter},
23 time::sleep,
24};
25use tracing::{debug, error, trace};
26
27use crate::config::{DatadogIntakeConfig, MillstoneConfig, TargetConfig};
28
29const MILLSTONE_CONFIG_PATH_INTERNAL: &str = "/etc/millstone/config.toml";
30const DATADOG_INTAKE_HEALTHCHECK_INTERVAL: Duration = Duration::from_secs(1);
31const DATADOG_INTAKE_HEALTHCHECK_TIMEOUT: Duration = Duration::from_secs(1);
32const DATADOG_INTAKE_HEALTHCHECK_RETRIES: i64 = 30;
33const DATADOG_INTAKE_HEALTHCHECK_START_PERIOD: Duration = Duration::from_secs(1);
34const DATADOG_INTAKE_HEALTHCHECK_START_INTERVAL: Duration = Duration::from_secs(1);
35const DATADOG_INTAKE_HEALTHCHECK_COMMAND: &str = concat!(
36 "exec 3<>/dev/tcp/127.0.0.1/2049 && ",
37 "printf 'GET /ready HTTP/1.1\\r\\nHost: localhost\\r\\nConnection: close\\r\\n\\r\\n' >&3 && ",
38 "grep -q '200 OK' <&3"
39);
40
41pub enum ExitStatus {
42 Success,
43 Failed { code: i64, error: String },
44}
45
46impl fmt::Display for ExitStatus {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 match self {
49 ExitStatus::Success => write!(f, "success (0)"),
50 ExitStatus::Failed { code, error } => write!(f, "failed (exit code: {}, error: {})", code, error),
51 }
52 }
53}
54
55#[derive(Clone)]
59pub struct DriverConfig {
60 driver_id: &'static str,
61 image: String,
62 entrypoint: Option<Vec<String>>,
63 command: Option<Vec<String>>,
64 env: Vec<String>,
65 binds: Vec<String>,
66 healthcheck: Option<HealthConfig>,
67 exposed_ports: Vec<(&'static str, u16)>,
68 additional_volume_mounts: Vec<String>,
75
76 network_aliases: Vec<String>,
83
84 additional_networks: Vec<String>,
91}
92
93impl DriverConfig {
94 pub async fn millstone(config: MillstoneConfig) -> Result<Self, GenericError> {
95 match tokio::fs::metadata(&config.config_path).await {
97 Ok(metadata) if metadata.is_file() => {}
98 Ok(_) => {
99 return Err(generic_error!(
100 "Specified millstone configuration path ({}) does not point to a file.",
101 config.config_path.display()
102 ))
103 }
104 Err(e) => {
105 return Err(generic_error!(
106 "Failed to ensure specified millstone configuration ({}) exists locally: {}",
107 config.config_path.display(),
108 e
109 ))
110 }
111 }
112
113 let millstone_binary_path = config
114 .binary_path
115 .unwrap_or_else(|| "/usr/local/bin/millstone".to_string());
116 let entrypoint = vec![millstone_binary_path, MILLSTONE_CONFIG_PATH_INTERNAL.to_string()];
117
118 let driver_config = Self::from_image("millstone", config.image)
119 .with_entrypoint(entrypoint)
120 .with_bind_mount(config.config_path, MILLSTONE_CONFIG_PATH_INTERNAL);
121
122 Ok(driver_config)
123 }
124
125 pub async fn datadog_intake(config: DatadogIntakeConfig) -> Result<Self, GenericError> {
126 let datadog_intake_binary_path = config
127 .binary_path
128 .unwrap_or_else(|| "/usr/local/bin/datadog-intake".to_string());
129 let entrypoint = vec![datadog_intake_binary_path];
130
131 let driver_config = DriverConfig::from_image("datadog-intake", config.image)
132 .with_entrypoint(entrypoint)
133 .with_healthcheck(
134 vec![
135 "/bin/bash".to_string(),
136 "-c".to_string(),
137 DATADOG_INTAKE_HEALTHCHECK_COMMAND.to_string(),
138 ],
139 DATADOG_INTAKE_HEALTHCHECK_INTERVAL,
140 DATADOG_INTAKE_HEALTHCHECK_TIMEOUT,
141 DATADOG_INTAKE_HEALTHCHECK_RETRIES,
142 DATADOG_INTAKE_HEALTHCHECK_START_PERIOD,
143 DATADOG_INTAKE_HEALTHCHECK_START_INTERVAL,
144 )
145 .with_exposed_port("tcp", 2049);
148
149 Ok(driver_config)
150 }
151
152 pub async fn target(target_id: &'static str, config: TargetConfig) -> Result<Self, GenericError> {
153 let driver_config = DriverConfig::from_image(target_id, config.image)
154 .with_entrypoint(config.entrypoint)
155 .with_command(config.command)
156 .with_env_vars(config.additional_env_vars);
157
158 Ok(driver_config)
159 }
160
161 pub fn from_image(driver_id: &'static str, image: String) -> Self {
163 Self {
164 driver_id,
165 image,
166 entrypoint: None,
167 command: None,
168 env: vec![],
169 binds: vec![],
170 healthcheck: None,
171 exposed_ports: vec![],
172 additional_volume_mounts: vec![],
173 network_aliases: vec![],
174 additional_networks: vec![],
175 }
176 }
177
178 pub fn with_entrypoint(mut self, entrypoint: Vec<String>) -> Self {
182 if !entrypoint.is_empty() {
183 self.entrypoint = Some(entrypoint);
184 }
185 self
186 }
187
188 pub fn with_command(mut self, command: Vec<String>) -> Self {
192 if !command.is_empty() {
193 self.command = Some(command);
194 }
195 self
196 }
197
198 pub fn with_env_var<K, V>(mut self, key: K, value: V) -> Self
200 where
201 K: AsRef<str>,
202 V: AsRef<str>,
203 {
204 self.env.push(format!("{}={}", key.as_ref(), value.as_ref()));
205 self
206 }
207
208 pub fn with_env_vars(mut self, env: Vec<String>) -> Self {
210 self.env.extend(env);
211 self
212 }
213
214 pub fn with_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
219 where
220 HP: AsRef<Path>,
221 CP: AsRef<Path>,
222 {
223 let bind_mount = format!("{}:{}", host_path.as_ref().display(), container_path.as_ref().display());
224 self.binds.push(bind_mount);
225 self
226 }
227
228 pub fn with_readonly_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
232 where
233 HP: AsRef<Path>,
234 CP: AsRef<Path>,
235 {
236 let bind_mount = format!(
237 "{}:{}:ro",
238 host_path.as_ref().display(),
239 container_path.as_ref().display()
240 );
241 self.binds.push(bind_mount);
242 self
243 }
244
245 pub fn with_healthcheck(
247 mut self, mut test_command: Vec<String>, interval: Duration, timeout: Duration, retries: i64,
248 start_period: Duration, start_interval: Duration,
249 ) -> Self {
250 test_command.insert(0, "CMD".to_string());
253
254 self.healthcheck = Some(HealthConfig {
255 test: Some(test_command),
256 interval: Some(interval.as_nanos() as i64),
257 timeout: Some(timeout.as_nanos() as i64),
258 retries: Some(retries),
259 start_period: Some(start_period.as_nanos() as i64),
260 start_interval: Some(start_interval.as_nanos() as i64),
261 });
262 self
263 }
264
265 pub fn with_network_alias(mut self, alias: impl Into<String>) -> Self {
270 self.network_aliases.push(alias.into());
271 self
272 }
273
274 pub fn with_network(mut self, network: impl Into<String>) -> Self {
280 self.additional_networks.push(network.into());
281 self
282 }
283
284 pub fn with_volume_mount(mut self, volume_name: impl Into<String>, container_path: impl AsRef<Path>) -> Self {
292 self.additional_volume_mounts
293 .push(format!("{}:{}", volume_name.into(), container_path.as_ref().display()));
294 self
295 }
296
297 pub fn with_exposed_port(mut self, protocol: &'static str, internal_port: u16) -> Self {
306 self.exposed_ports.push((protocol, internal_port));
307 self
308 }
309}
310
311#[derive(Debug, Default)]
313pub struct DriverDetails {
314 container_name: String,
315 port_mappings: Option<HashMap<String, u16>>,
316}
317
318impl DriverDetails {
319 pub fn container_name(&self) -> &str {
321 &self.container_name
322 }
323
324 pub fn try_get_exposed_port(&self, protocol: &str, internal_port: u16) -> Option<u16> {
330 self.port_mappings
331 .as_ref()
332 .and_then(|port_mappings| port_mappings.get(&format!("{}/{}", internal_port, protocol)).copied())
333 }
334}
335
336pub struct Driver {
338 isolation_group_id: String,
339 isolation_group_name: String,
340 container_name: String,
341 config: DriverConfig,
342 docker: Docker,
343 log_dir: Option<PathBuf>,
344}
345
346impl Driver {
347 pub fn from_config(isolation_group_id: String, config: DriverConfig) -> Result<Self, GenericError> {
367 let docker = crate::docker::connect()?;
368
369 Ok(Self {
370 isolation_group_name: format!("airlock-{}", isolation_group_id),
371 container_name: format!("airlock-{}-{}", isolation_group_id, config.driver_id),
372 isolation_group_id,
373 config,
374 docker,
375 log_dir: None,
376 })
377 }
378
379 pub fn with_logging(mut self, log_dir: PathBuf) -> Self {
385 self.log_dir = Some(log_dir);
386 self
387 }
388
389 pub fn driver_id(&self) -> &'static str {
393 self.config.driver_id
394 }
395
396 pub async fn clean_related_resources(isolation_group_id: String) -> Result<(), GenericError> {
405 let docker = crate::docker::connect()?;
406
407 let isolation_group_name = format!("airlock-{}", isolation_group_id);
408 let isolation_group_label = format!("airlock-isolation-group={}", isolation_group_id);
409
410 let list_filters: HashMap<&str, Vec<&str>> =
412 [("label", vec!["created_by=airlock", isolation_group_label.as_str()])]
413 .into_iter()
414 .collect();
415 let list_options = Some(
416 ListContainersOptionsBuilder::default()
417 .all(true)
418 .filters(&list_filters)
419 .build(),
420 );
421 let containers = docker.list_containers(list_options).await.with_error_context(|| {
422 format!(
423 "Failed to list containers attached to isolation group '{}'.",
424 isolation_group_id
425 )
426 })?;
427
428 for container in containers {
429 let container_name = match container.id {
430 Some(id) => id,
431 None => {
432 debug!("Listed container had no ID. Skipping removal.");
433 continue;
434 }
435 };
436
437 if let Err(e) = docker.stop_container(container_name.as_str(), None).await {
438 error!(error = %e, "Failed to stop container '{}'.", container_name);
439 continue;
440 } else {
441 debug!("Stopped container '{}'.", container_name);
442 }
443
444 if let Err(e) = docker.remove_container(container_name.as_str(), None).await {
445 error!(error = %e, "Failed to remove container '{}'.", container_name);
446 continue;
447 } else {
448 debug!("Removed container '{}'.", container_name);
449 }
450 }
451
452 if let Err(e) = docker
454 .remove_volume(
455 isolation_group_name.as_str(),
456 None::<bollard::query_parameters::RemoveVolumeOptions>,
457 )
458 .await
459 {
460 error!(error = %e, "Failed to remove shared volume '{}'.", isolation_group_name);
461 } else {
462 debug!("Removed shared volume '{}'.", isolation_group_name);
463 }
464
465 if let Err(e) = docker.remove_network(isolation_group_name.as_str()).await {
467 error!(error = %e, "Failed to remove shared network '{}'.", isolation_group_name);
468 } else {
469 debug!("Removed shared network '{}'.", isolation_group_name);
470 }
471
472 Ok(())
473 }
474
475 async fn create_network_if_missing(&self) -> Result<(), GenericError> {
476 let networks = self.docker.list_networks(None).await?;
478 if networks
479 .iter()
480 .any(|network| network.name.as_deref() == Some(self.isolation_group_name.as_str()))
481 {
482 debug!("Network '{}' already exists.", self.isolation_group_name);
483 return Ok(());
484 }
485
486 debug!(
487 driver_id = self.config.driver_id,
488 isolation_group = self.isolation_group_id,
489 "Network '{}' does not exist. Creating...",
490 self.isolation_group_name
491 );
492
493 let network_options = NetworkCreateRequest {
495 name: self.isolation_group_name.clone(),
496 driver: Some("bridge".to_string()),
497 ipam: Some(Ipam::default()),
498 enable_ipv6: Some(false),
499 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
500 ..Default::default()
501 };
502 let response = self.docker.create_network(network_options).await?;
503 debug!(
504 driver_id = self.config.driver_id,
505 isolation_group = self.isolation_group_id,
506 "Created network '{}' (ID: {:?}).",
507 self.isolation_group_name,
508 response.id
509 );
510
511 Ok(())
512 }
513
514 async fn create_image_if_missing_inner(&self, image: &str) -> Result<(), GenericError> {
515 let image_options = CreateImageOptions {
516 from_image: Some(image.to_string()),
517 ..Default::default()
518 };
519
520 let mut create_stream = self.docker.create_image(Some(image_options), None, None);
521 while let Some(info) = create_stream.next().await {
522 trace!(
523 driver_id = self.config.driver_id,
524 isolation_group = self.isolation_group_id,
525 image,
526 "Received image pull update: {:?}",
527 info
528 );
529 }
530
531 Ok(())
532 }
533
534 async fn create_image_if_missing(&self) -> Result<(), GenericError> {
535 debug!(
536 driver_id = self.config.driver_id,
537 isolation_group = self.isolation_group_id,
538 "Pulling image '{}'...",
539 self.config.image
540 );
541
542 self.create_image_if_missing_inner(self.config.image.as_str()).await?;
543
544 debug!(
545 driver_id = self.config.driver_id,
546 isolation_group = self.isolation_group_id,
547 "Pulled image '{}'.",
548 self.config.image
549 );
550
551 Ok(())
552 }
553
554 async fn create_volume_if_missing(&self) -> Result<(), GenericError> {
555 let volumes = self
557 .docker
558 .list_volumes(None::<bollard::query_parameters::ListVolumesOptions>)
559 .await?;
560 if volumes
561 .volumes
562 .iter()
563 .flatten()
564 .any(|volume| volume.name == self.isolation_group_name.as_str())
565 {
566 debug!("Shared volume '{}' already exists.", self.isolation_group_name);
567 return Ok(());
568 }
569
570 debug!(
571 driver_id = self.config.driver_id,
572 isolation_group = self.isolation_group_id,
573 "Shared volume '{}' does not exist. Creating...",
574 self.isolation_group_name
575 );
576
577 let volume_options = VolumeCreateRequest {
578 name: Some(self.isolation_group_name.clone()),
579 driver: Some("local".to_string()),
580 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
581 ..Default::default()
582 };
583 self.docker.create_volume(volume_options).await?;
584
585 debug!(
586 driver_id = self.config.driver_id,
587 isolation_group = self.isolation_group_id,
588 "Created shared volume '{}'.",
589 self.isolation_group_name
590 );
591
592 Ok(())
593 }
594
595 async fn adjust_shared_volume_permissions(&self) -> Result<(), GenericError> {
596 debug!(
597 driver_id = self.config.driver_id,
598 isolation_group = self.isolation_group_id,
599 "Adjusting permissions on shared volume '{}'...",
600 self.container_name
601 );
602
603 let image = get_alpine_container_image();
605 self.create_image_if_missing_inner(&image).await?;
606
607 let container_name = format!("airlock-{}-volume-fix-up", self.isolation_group_id);
608 let entrypoint = vec![
609 "chmod".to_string(),
610 "-R".to_string(),
611 "777".to_string(),
612 "/airlock".to_string(),
613 ];
614 let _ = self
615 .create_container_inner(container_name.clone(), image, Some(entrypoint), None, vec![], None)
616 .await?;
617
618 self.start_container_inner(&container_name).await?;
619 self.wait_for_container_exit_inner(&container_name).await?;
620 self.cleanup_inner(&container_name).await?;
621
622 Ok(())
623 }
624
625 async fn create_container_inner(
626 &self, container_name: String, image: String, entrypoint: Option<Vec<String>>, cmd: Option<Vec<String>>,
627 mut binds: Vec<String>, env: Option<Vec<String>>,
628 ) -> Result<String, GenericError> {
629 binds.push(format!("{}:/airlock:z", self.isolation_group_name));
632
633 binds.push("/proc:/host/proc:ro".to_string());
636 binds.push("/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string());
637 binds.push("/var/run/docker.sock:/var/run/docker.sock:ro".to_string());
638
639 for mount in &self.config.additional_volume_mounts {
641 binds.push(mount.clone());
642 }
643
644 let networking_config = if !self.config.network_aliases.is_empty() {
646 let mut endpoints = HashMap::new();
647 endpoints.insert(
648 self.isolation_group_name.clone(),
649 EndpointSettings {
650 aliases: Some(self.config.network_aliases.clone()),
651 ..Default::default()
652 },
653 );
654 Some(
655 ContainerNetworkingConfig {
656 endpoints_config: endpoints,
657 }
658 .into(),
659 )
660 } else {
661 None
662 };
663
664 let (publish_all_ports, exposed_ports) = if self.config.exposed_ports.is_empty() {
665 (None, None)
666 } else {
667 let exposed_ports: Vec<String> = self
668 .config
669 .exposed_ports
670 .iter()
671 .map(|(protocol, internal_port)| format!("{}/{}", internal_port, protocol))
672 .collect();
673 (Some(true), Some(exposed_ports))
674 };
675
676 let container_config = ContainerCreateBody {
677 hostname: Some(self.config.driver_id.to_string()),
678 env,
679 image: Some(image),
680 entrypoint,
681 cmd,
682 host_config: Some(HostConfig {
683 binds: Some(binds),
684 network_mode: Some(self.isolation_group_name.clone()),
685 publish_all_ports,
686 pid_mode: Some("host".to_string()),
687 ..Default::default()
688 }),
689 healthcheck: self.config.healthcheck.clone(),
690 exposed_ports,
691 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
692 networking_config,
693 ..Default::default()
694 };
695
696 let create_options = CreateContainerOptionsBuilder::default().name(&container_name).build();
697
698 let response = self
699 .docker
700 .create_container(Some(create_options), container_config)
701 .await?;
702
703 Ok(response.id)
704 }
705
706 async fn create_container(&self) -> Result<(), GenericError> {
707 debug!(
708 driver_id = self.config.driver_id,
709 isolation_group = self.isolation_group_id,
710 "Creating container '{}'...",
711 self.container_name
712 );
713
714 let container_id = self
715 .create_container_inner(
716 self.container_name.clone(),
717 self.config.image.clone(),
718 self.config.entrypoint.clone(),
719 self.config.command.clone(),
720 self.config.binds.clone(),
721 Some(self.config.env.clone()),
722 )
723 .await?;
724
725 debug!(
726 driver_id = self.config.driver_id,
727 isolation_group = self.isolation_group_id,
728 "Created container '{}' (ID: {}).",
729 self.container_name,
730 container_id
731 );
732
733 Ok(())
734 }
735
736 async fn start_container_inner(&self, container_name: &str) -> Result<DriverDetails, GenericError> {
737 self.docker.start_container(container_name, None).await?;
738
739 let mut details = DriverDetails {
740 container_name: container_name.to_string(),
741 ..Default::default()
742 };
743
744 let response = self.docker.inspect_container(container_name, None).await?;
745 if let Some(network_settings) = response.network_settings {
746 if let Some(ports) = network_settings.ports {
747 let port_mappings = details.port_mappings.get_or_insert_with(HashMap::new);
748 for (internal_port, bindings) in ports {
749 if let Some(bindings) = bindings {
750 for binding in bindings {
751 if let Some(host_ip) = binding.host_ip.as_deref() {
752 if host_ip == "0.0.0.0" {
753 let maybe_host_port =
754 binding.host_port.as_ref().and_then(|value| value.parse::<u16>().ok());
755
756 if let Some(host_port) = maybe_host_port {
757 port_mappings.insert(internal_port, host_port);
758 break;
759 }
760 }
761 }
762 }
763 }
764 }
765 }
766 }
767
768 Ok(details)
769 }
770
771 async fn start_container(&self) -> Result<DriverDetails, GenericError> {
772 debug!(
773 driver_id = self.config.driver_id,
774 isolation_group = self.isolation_group_id,
775 "Starting container '{}'...",
776 self.container_name
777 );
778
779 let details = self.start_container_inner(&self.container_name).await?;
780
781 if let Some(log_dir) = self.log_dir.clone() {
782 debug!(
783 "Capturing logs for container '{}' to {}...",
784 self.container_name,
785 log_dir.display()
786 );
787
788 self.capture_container_logs(log_dir, self.config.driver_id, &self.container_name)
789 .await?;
790 }
791
792 debug!(
793 driver_id = self.config.driver_id,
794 isolation_group = self.isolation_group_id,
795 "Started container '{}'.",
796 self.container_name
797 );
798
799 Ok(details)
800 }
801
802 async fn connect_to_additional_networks(&self) -> Result<(), GenericError> {
807 for network in &self.config.additional_networks {
808 self.docker
809 .connect_network(
810 network,
811 NetworkConnectRequest {
812 container: self.container_name.clone(),
813 endpoint_config: None,
814 },
815 )
816 .await
817 .map_err(|e| {
818 generic_error!(
819 "Failed to connect container '{}' to network '{}': {}",
820 self.container_name,
821 network,
822 e
823 )
824 })?;
825 }
826 Ok(())
827 }
828
829 pub async fn start(&mut self) -> Result<DriverDetails, GenericError> {
836 self.create_network_if_missing().await?;
837 self.create_image_if_missing().await?;
838 self.create_volume_if_missing().await?;
839 self.adjust_shared_volume_permissions().await?;
840
841 self.create_container().await?;
842 self.connect_to_additional_networks().await?;
843 self.start_container().await
844 }
845
846 pub async fn wait_for_container_healthy(&mut self) -> Result<(), GenericError> {
854 loop {
855 let response = self.docker.inspect_container(&self.container_name, None).await?;
857 let state = response
858 .state
859 .ok_or_else(|| generic_error!("Container state should be present."))?;
860
861 let status = state
863 .status
864 .ok_or_else(|| generic_error!("Container status should be present."))?;
865 if status != ContainerStateStatusEnum::RUNNING {
866 return Err(generic_error!(
867 "Container exited unexpectedly (driver_id: {}, container: {}). Check logs in the test run directory.",
868 self.config.driver_id,
869 self.container_name
870 ));
871 }
872
873 if let Some(health_status) = state.health.and_then(|h| h.status) {
874 match health_status {
875 HealthStatusEnum::EMPTY | HealthStatusEnum::NONE | HealthStatusEnum::HEALTHY => {
877 debug!(
878 driver_id = self.config.driver_id,
879 "Container '{}' healthy or no healthcheck defined. Proceeding.", &self.container_name
880 );
881 return Ok(());
882 }
883
884 HealthStatusEnum::STARTING => {
886 debug!(
887 driver_id = self.config.driver_id,
888 "Container '{}' not yet healthy. Waiting...", &self.container_name
889 );
890 }
891
892 HealthStatusEnum::UNHEALTHY => {
893 return Err(generic_error!(
894 "Container became unhealthy (driver_id: {}, container: {}). Check logs in the test run directory.",
895 self.config.driver_id,
896 self.container_name
897 ));
898 }
899 }
900 } else {
901 debug!(
902 driver_id = self.config.driver_id,
903 "Container '{}' has no healthcheck defined. Proceeding.", &self.container_name
904 );
905 return Ok(());
906 }
907
908 sleep(Duration::from_secs(1)).await;
910 }
911 }
912
913 async fn wait_for_container_exit_inner(&self, container_name: &str) -> Result<ExitStatus, GenericError> {
914 let mut wait_stream = self.docker.wait_container(container_name, None);
915 match wait_stream.next().await {
916 Some(result) => match result {
917 Ok(response) => {
918 assert_eq!(response.error, None);
926 assert_eq!(response.status_code, 0);
927
928 Ok(ExitStatus::Success)
929 }
930
931 Err(Error::DockerContainerWaitError { error, code }) => {
932 let error = if error.is_empty() {
933 String::from("<no error message provided>")
934 } else {
935 error
936 };
937 Ok(ExitStatus::Failed { code, error })
938 }
939
940 Err(e) => Err(generic_error!("Failed to wait for container to finish: {:?}", e)),
941 },
942 None => unreachable!("Docker wait stream ended unexpectedly."),
943 }
944 }
945
946 pub async fn wait_for_container_exit(&self) -> Result<ExitStatus, GenericError> {
955 debug!(
956 driver_id = self.config.driver_id,
957 isolation_group = self.isolation_group_id,
958 "Waiting for container '{}' to finish...",
959 &self.container_name
960 );
961
962 let exit_status = self.wait_for_container_exit_inner(&self.container_name).await?;
963
964 debug!(
965 driver_id = self.config.driver_id,
966 isolation_group = self.isolation_group_id,
967 "Container '{}' finished successfully.",
968 &self.container_name
969 );
970
971 Ok(exit_status)
972 }
973
974 pub async fn exec_in_container(&self, cmd: Vec<String>) -> Result<String, GenericError> {
983 let exec_opts = CreateExecOptions {
984 attach_stdout: Some(true),
985 attach_stderr: Some(false),
986 cmd: Some(cmd.clone()),
987 ..Default::default()
988 };
989
990 let exec = self
991 .docker
992 .create_exec(&self.container_name, exec_opts)
993 .await
994 .with_error_context(|| format!("Failed to create exec instance for container {}.", self.container_name))?;
995
996 let exec_id = exec.id.clone();
997
998 let output = self
999 .docker
1000 .start_exec(&exec.id, None)
1001 .await
1002 .with_error_context(|| format!("Failed to start exec for container {}.", self.container_name))?;
1003
1004 let mut stdout = String::new();
1005 if let StartExecResults::Attached { mut output, .. } = output {
1006 while let Some(chunk) = output.try_next().await? {
1007 if let LogOutput::StdOut { message } = chunk {
1008 stdout.push_str(&String::from_utf8_lossy(&message));
1009 }
1010 }
1011 }
1012
1013 let inspect = self
1015 .docker
1016 .inspect_exec(&exec_id)
1017 .await
1018 .error_context("Failed to inspect exec result.")?;
1019
1020 if let Some(code) = inspect.exit_code {
1021 if code != 0 {
1022 return Err(generic_error!(
1023 "Command {:?} exited with code {} in container {}.",
1024 cmd,
1025 code,
1026 self.container_name
1027 ));
1028 }
1029 }
1030
1031 Ok(stdout)
1032 }
1033
1034 async fn cleanup_inner(&self, container_name: &str) -> Result<(), GenericError> {
1035 self.docker.stop_container(container_name, None).await?;
1036 self.docker.remove_container(container_name, None).await?;
1037
1038 Ok(())
1039 }
1040
1041 pub async fn cleanup(self) -> Result<(), GenericError> {
1047 debug!(
1048 driver_id = self.config.driver_id,
1049 isolation_group = self.isolation_group_id,
1050 "Cleaning up container '{}'...",
1051 self.container_name
1052 );
1053
1054 let start = Instant::now();
1055
1056 self.cleanup_inner(&self.container_name).await?;
1057
1058 debug!(
1059 driver_id = self.config.driver_id,
1060 isolation_group = self.isolation_group_id,
1061 "Container '{}' removed after {:?}.",
1062 self.container_name,
1063 start.elapsed()
1064 );
1065
1066 Ok(())
1067 }
1068
1069 async fn capture_container_logs(
1070 &self, container_log_dir: PathBuf, log_name: &str, container_name: &str,
1071 ) -> Result<(), GenericError> {
1072 tokio::fs::create_dir_all(&container_log_dir)
1075 .await
1076 .error_context("Failed to create logs directory. Possible permissions issue.")?;
1077
1078 let stdout_log_path = container_log_dir.join(format!("{}.stdout.log", log_name));
1079 let stderr_log_path = container_log_dir.join(format!("{}.stderr.log", log_name));
1080
1081 let mut stdout_file = tokio::fs::File::create(&stdout_log_path)
1082 .await
1083 .map(BufWriter::new)
1084 .error_context("Failed to create standard output log file. Possible permissions issue.")?;
1085 let mut stderr_file = tokio::fs::File::create(&stderr_log_path)
1086 .await
1087 .map(BufWriter::new)
1088 .error_context("Failed to create standard error log file. Possible permissions issue.")?;
1089
1090 let logs_config = LogsOptions {
1092 follow: true,
1093 stdout: true,
1094 stderr: true,
1095 ..Default::default()
1096 };
1097 let mut log_stream = self.docker.logs(container_name, Some(logs_config));
1098
1099 tokio::spawn(async move {
1100 while let Some(log_result) = log_stream.next().await {
1101 match log_result {
1102 Ok(log) => match log {
1103 LogOutput::StdErr { message } => {
1104 if let Err(e) = stderr_file.write_all(&strip_ansi_codes(&message)).await {
1105 error!(error = %e, "Failed to write log line to standard error log file.");
1106 break;
1107 }
1108 if let Err(e) = stderr_file.flush().await {
1109 error!(error = %e, "Failed to flush standard error log file.");
1110 break;
1111 }
1112 }
1113 LogOutput::StdOut { message } => {
1114 if let Err(e) = stdout_file.write_all(&strip_ansi_codes(&message)).await {
1115 error!(error = %e, "Failed to write log line to standard output log file.");
1116 break;
1117 }
1118 if let Err(e) = stdout_file.flush().await {
1119 error!(error = %e, "Failed to flush standard output log file.");
1120 break;
1121 }
1122 }
1123 LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
1124 },
1125 Err(e) => {
1126 error!(error = %e, "Failed to read log line from container.");
1127 break;
1128 }
1129 }
1130 }
1131
1132 if let Err(e) = stdout_file.get_mut().sync_all().await {
1134 error!(error = %e, "Failed to fsync standard output log file.");
1135 }
1136
1137 if let Err(e) = stderr_file.get_mut().sync_all().await {
1138 error!(error = %e, "Failed to fsync standard error log file.");
1139 }
1140 });
1141
1142 Ok(())
1143 }
1144}
1145
1146fn strip_ansi_codes(input: &[u8]) -> Vec<u8> {
1148 let mut out = Vec::with_capacity(input.len());
1149 let mut i = 0;
1150 while i < input.len() {
1151 if input[i] == 0x1b && input.get(i + 1) == Some(&b'[') {
1152 i += 2;
1153 while i < input.len() && !input[i].is_ascii_alphabetic() {
1154 i += 1;
1155 }
1156 i += 1;
1157 } else {
1158 out.push(input[i]);
1159 i += 1;
1160 }
1161 }
1162 out
1163}
1164
1165fn get_alpine_container_image() -> String {
1166 std::env::var("PANORAMIC_ALPINE_IMAGE").unwrap_or_else(|_| "alpine:latest".to_string())
1173}
1174
1175fn get_default_airlock_labels(isolation_group_id: &str) -> HashMap<String, String> {
1176 let mut labels = HashMap::new();
1177 labels.insert("created_by".to_string(), "airlock".to_string());
1178 labels.insert("airlock-isolation-group".to_string(), isolation_group_id.to_string());
1179 labels
1180}