1use std::{
2 collections::HashMap,
3 fmt,
4 path::{Path, PathBuf},
5 time::{Duration, Instant},
6};
7
8use bollard::{
9 container::{Config, CreateContainerOptions, ListContainersOptions, LogOutput, LogsOptions},
10 errors::Error,
11 image::CreateImageOptions,
12 models::{HealthConfig, HealthStatusEnum, HostConfig, Ipam},
13 network::CreateNetworkOptions,
14 secret::ContainerStateStatusEnum,
15 volume::CreateVolumeOptions,
16 Docker,
17};
18use futures::StreamExt as _;
19use saluki_error::{generic_error, ErrorContext as _, GenericError};
20use tokio::{
21 io::{AsyncWriteExt as _, BufWriter},
22 time::sleep,
23};
24use tracing::{debug, error, trace};
25
26use crate::config::{DatadogIntakeConfig, MillstoneConfig, TargetConfig};
27
28const MILLSTONE_CONFIG_PATH_INTERNAL: &str = "/etc/millstone/config.toml";
29const DATADOG_INTAKE_CONFIG_PATH_INTERNAL: &str = "/etc/datadog-intake/config.toml";
30
31pub enum ExitStatus {
32 Success,
33 Failed { code: i64, error: String },
34}
35
36impl fmt::Display for ExitStatus {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 match self {
39 ExitStatus::Success => write!(f, "success (0)"),
40 ExitStatus::Failed { code, error } => write!(f, "failed (exit code: {}, error: {})", code, error),
41 }
42 }
43}
44
45#[derive(Clone)]
49pub struct DriverConfig {
50 driver_id: &'static str,
51 image: String,
52 entrypoint: Option<Vec<String>>,
53 command: Option<Vec<String>>,
54 env: Vec<String>,
55 binds: Vec<String>,
56 healthcheck: Option<HealthConfig>,
57 exposed_ports: Vec<(&'static str, u16)>,
58}
59
60impl DriverConfig {
61 pub async fn millstone(config: MillstoneConfig) -> Result<Self, GenericError> {
62 match tokio::fs::metadata(&config.config_path).await {
64 Ok(metadata) if metadata.is_file() => {}
65 Ok(_) => {
66 return Err(generic_error!(
67 "Specified millstone configuration path ({}) does not point to a file.",
68 config.config_path.display()
69 ))
70 }
71 Err(e) => {
72 return Err(generic_error!(
73 "Failed to ensure specified millstone configuration ({}) exists locally: {}",
74 config.config_path.display(),
75 e
76 ))
77 }
78 }
79
80 let millstone_binary_path = config
81 .binary_path
82 .unwrap_or_else(|| "/usr/local/bin/millstone".to_string());
83 let entrypoint = vec![millstone_binary_path, MILLSTONE_CONFIG_PATH_INTERNAL.to_string()];
84
85 let driver_config = Self::from_image("millstone", config.image)
86 .with_entrypoint(entrypoint)
87 .with_bind_mount(config.config_path, MILLSTONE_CONFIG_PATH_INTERNAL);
88
89 Ok(driver_config)
90 }
91
92 pub async fn datadog_intake(config: DatadogIntakeConfig) -> Result<Self, GenericError> {
93 match tokio::fs::metadata(&config.config_path).await {
95 Ok(metadata) if metadata.is_file() => {}
96 Ok(_) => {
97 return Err(generic_error!(
98 "Specified datadog-intake configuration ({}) does not point to a file.",
99 config.config_path.display()
100 ))
101 }
102 Err(e) => {
103 return Err(generic_error!(
104 "Failed to ensure specified datadog-intake configuration ({}) exists locally: {}",
105 config.config_path.display(),
106 e
107 ))
108 }
109 }
110
111 let datadog_intake_binary_path = config
112 .binary_path
113 .unwrap_or_else(|| "/usr/local/bin/datadog-intake".to_string());
114 let entrypoint = vec![
115 datadog_intake_binary_path,
116 DATADOG_INTAKE_CONFIG_PATH_INTERNAL.to_string(),
117 ];
118
119 let driver_config = DriverConfig::from_image("datadog-intake", config.image)
120 .with_entrypoint(entrypoint)
121 .with_bind_mount(config.config_path, DATADOG_INTAKE_CONFIG_PATH_INTERNAL)
122 .with_exposed_port("tcp", 2049);
125
126 Ok(driver_config)
127 }
128
129 pub async fn target(target_id: &'static str, config: TargetConfig) -> Result<Self, GenericError> {
130 let driver_config = DriverConfig::from_image(target_id, config.image)
131 .with_entrypoint(config.entrypoint)
132 .with_command(config.command)
133 .with_env_vars(config.additional_env_vars);
134
135 Ok(driver_config)
136 }
137
138 fn from_image(driver_id: &'static str, image: String) -> Self {
140 Self {
141 driver_id,
142 image,
143 entrypoint: None,
144 command: None,
145 env: vec![],
146 binds: vec![],
147 healthcheck: None,
148 exposed_ports: vec![],
149 }
150 }
151
152 pub fn with_entrypoint(mut self, entrypoint: Vec<String>) -> Self {
156 if !entrypoint.is_empty() {
157 self.entrypoint = Some(entrypoint);
158 }
159 self
160 }
161
162 pub fn with_command(mut self, command: Vec<String>) -> Self {
166 if !command.is_empty() {
167 self.command = Some(command);
168 }
169 self
170 }
171
172 pub fn with_env_var<K, V>(mut self, key: K, value: V) -> Self
174 where
175 K: AsRef<str>,
176 V: AsRef<str>,
177 {
178 self.env.push(format!("{}={}", key.as_ref(), value.as_ref()));
179 self
180 }
181
182 pub fn with_env_vars(mut self, env: Vec<String>) -> Self {
184 self.env.extend(env);
185 self
186 }
187
188 pub fn with_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
193 where
194 HP: AsRef<Path>,
195 CP: AsRef<Path>,
196 {
197 let bind_mount = format!("{}:{}", host_path.as_ref().display(), container_path.as_ref().display());
198 self.binds.push(bind_mount);
199 self
200 }
201
202 pub fn with_healthcheck(
204 mut self, mut test_command: Vec<String>, interval: Duration, timeout: Duration, retries: i64,
205 start_period: Duration, start_interval: Duration,
206 ) -> Self {
207 test_command.insert(0, "CMD".to_string());
210
211 self.healthcheck = Some(HealthConfig {
212 test: Some(test_command),
213 interval: Some(interval.as_nanos() as i64),
214 timeout: Some(timeout.as_nanos() as i64),
215 retries: Some(retries),
216 start_period: Some(start_period.as_nanos() as i64),
217 start_interval: Some(start_interval.as_nanos() as i64),
218 });
219 self
220 }
221
222 pub fn with_exposed_port(mut self, protocol: &'static str, internal_port: u16) -> Self {
231 self.exposed_ports.push((protocol, internal_port));
232 self
233 }
234}
235
236#[derive(Debug, Default)]
238pub struct DriverDetails {
239 container_name: String,
240 port_mappings: Option<HashMap<String, u16>>,
241}
242
243impl DriverDetails {
244 pub fn container_name(&self) -> &str {
246 &self.container_name
247 }
248
249 pub fn try_get_exposed_port(&self, protocol: &str, internal_port: u16) -> Option<u16> {
255 self.port_mappings
256 .as_ref()
257 .and_then(|port_mappings| port_mappings.get(&format!("{}/{}", internal_port, protocol)).copied())
258 }
259}
260
261pub struct Driver {
263 isolation_group_id: String,
264 isolation_group_name: String,
265 container_name: String,
266 config: DriverConfig,
267 docker: Docker,
268 log_dir: Option<PathBuf>,
269}
270
271impl Driver {
272 pub fn from_config(isolation_group_id: String, config: DriverConfig) -> Result<Self, GenericError> {
292 let docker = Docker::connect_with_defaults()?;
293
294 Ok(Self {
295 isolation_group_name: format!("airlock-{}", isolation_group_id),
296 container_name: format!("airlock-{}-{}", isolation_group_id, config.driver_id),
297 isolation_group_id,
298 config,
299 docker,
300 log_dir: None,
301 })
302 }
303
304 pub fn with_logging(mut self, log_dir: PathBuf) -> Self {
310 self.log_dir = Some(log_dir);
311 self
312 }
313
314 pub fn driver_id(&self) -> &'static str {
318 self.config.driver_id
319 }
320
321 pub async fn clean_related_resources(isolation_group_id: String) -> Result<(), GenericError> {
330 let docker = Docker::connect_with_defaults()?;
331
332 let isolation_group_name = format!("airlock-{}", isolation_group_id);
333 let isolation_group_label = format!("airlock-isolation-group={}", isolation_group_id);
334
335 let list_options = Some(ListContainersOptions {
337 all: true,
338 filters: vec![("label", vec!["created_by=airlock", isolation_group_label.as_str()])]
339 .into_iter()
340 .collect(),
341 ..Default::default()
342 });
343 let containers = docker.list_containers(list_options).await.with_error_context(|| {
344 format!(
345 "Failed to list containers attached to isolation group '{}'.",
346 isolation_group_id
347 )
348 })?;
349
350 for container in containers {
351 let container_name = match container.id {
352 Some(id) => id,
353 None => {
354 debug!("Listed container had no ID. Skipping removal.");
355 continue;
356 }
357 };
358
359 if let Err(e) = docker.stop_container(container_name.as_str(), None).await {
360 error!(error = %e, "Failed to stop container '{}'.", container_name);
361 continue;
362 } else {
363 debug!("Stopped container '{}'.", container_name);
364 }
365
366 if let Err(e) = docker.remove_container(container_name.as_str(), None).await {
367 error!(error = %e, "Failed to remove container '{}'.", container_name);
368 continue;
369 } else {
370 debug!("Removed container '{}'.", container_name);
371 }
372 }
373
374 if let Err(e) = docker.remove_volume(isolation_group_name.as_str(), None).await {
376 error!(error = %e, "Failed to remove shared volume '{}'.", isolation_group_name);
377 } else {
378 debug!("Removed shared volume '{}'.", isolation_group_name);
379 }
380
381 if let Err(e) = docker.remove_network(isolation_group_name.as_str()).await {
383 error!(error = %e, "Failed to remove shared network '{}'.", isolation_group_name);
384 } else {
385 debug!("Removed shared network '{}'.", isolation_group_name);
386 }
387
388 Ok(())
389 }
390
391 async fn create_network_if_missing(&self) -> Result<(), GenericError> {
392 let networks = self.docker.list_networks::<String>(None).await?;
394 if networks
395 .iter()
396 .any(|network| network.name.as_deref() == Some(self.isolation_group_name.as_str()))
397 {
398 debug!("Network '{}' already exists.", self.isolation_group_name);
399 return Ok(());
400 }
401
402 debug!(
403 driver_id = self.config.driver_id,
404 isolation_group = self.isolation_group_id,
405 "Network '{}' does not exist. Creating...",
406 self.isolation_group_name
407 );
408
409 let network_options = CreateNetworkOptions {
411 name: self.isolation_group_name.clone(),
412 check_duplicate: true,
413 driver: "bridge".to_string(),
414 ipam: Ipam::default(),
415 enable_ipv6: false,
416 labels: get_default_airlock_labels(self.isolation_group_id.as_str()),
417 ..Default::default()
418 };
419 let response = self.docker.create_network(network_options).await?;
420 debug!(
421 driver_id = self.config.driver_id,
422 isolation_group = self.isolation_group_id,
423 "Created network '{}' (ID: {:?}).",
424 self.isolation_group_name,
425 response.id
426 );
427
428 Ok(())
429 }
430
431 async fn create_image_if_missing_inner(&self, image: &str) -> Result<(), GenericError> {
432 let image_options = CreateImageOptions {
433 from_image: image,
434 ..Default::default()
435 };
436
437 let mut create_stream = self.docker.create_image(Some(image_options), None, None);
438 while let Some(info) = create_stream.next().await {
439 trace!(
440 driver_id = self.config.driver_id,
441 isolation_group = self.isolation_group_id,
442 image,
443 "Received image pull update: {:?}",
444 info
445 );
446 }
447
448 Ok(())
449 }
450
451 async fn create_image_if_missing(&self) -> Result<(), GenericError> {
452 debug!(
453 driver_id = self.config.driver_id,
454 isolation_group = self.isolation_group_id,
455 "Pulling image '{}'...",
456 self.config.image
457 );
458
459 self.create_image_if_missing_inner(self.config.image.as_str()).await?;
460
461 debug!(
462 driver_id = self.config.driver_id,
463 isolation_group = self.isolation_group_id,
464 "Pulled image '{}'.",
465 self.config.image
466 );
467
468 Ok(())
469 }
470
471 async fn create_volume_if_missing(&self) -> Result<(), GenericError> {
472 let volumes = self.docker.list_volumes::<String>(None).await?;
474 if volumes
475 .volumes
476 .iter()
477 .flatten()
478 .any(|volume| volume.name == self.isolation_group_name.as_str())
479 {
480 debug!("Shared volume '{}' already exists.", self.isolation_group_name);
481 return Ok(());
482 }
483
484 debug!(
485 driver_id = self.config.driver_id,
486 isolation_group = self.isolation_group_id,
487 "Shared volume '{}' does not exist. Creating...",
488 self.isolation_group_name
489 );
490
491 let volume_options = CreateVolumeOptions {
492 name: self.isolation_group_name.clone(),
493 driver: "local".to_string(),
494 labels: get_default_airlock_labels(self.isolation_group_id.as_str()),
495 ..Default::default()
496 };
497 self.docker.create_volume(volume_options).await?;
498
499 debug!(
500 driver_id = self.config.driver_id,
501 isolation_group = self.isolation_group_id,
502 "Created shared volume '{}'.",
503 self.isolation_group_name
504 );
505
506 Ok(())
507 }
508
509 async fn adjust_shared_volume_permissions(&self) -> Result<(), GenericError> {
510 debug!(
511 driver_id = self.config.driver_id,
512 isolation_group = self.isolation_group_id,
513 "Adjusting permissions on shared volume '{}'...",
514 self.container_name
515 );
516
517 let image = get_alpine_container_image();
519 self.create_image_if_missing_inner(&image).await?;
520
521 let container_name = format!("airlock-{}-volume-fix-up", self.isolation_group_id);
522 let entrypoint = vec![
523 "chmod".to_string(),
524 "-R".to_string(),
525 "777".to_string(),
526 "/airlock".to_string(),
527 ];
528 let _ = self
529 .create_container_inner(container_name.clone(), image, Some(entrypoint), None, vec![], None)
530 .await?;
531
532 self.start_container_inner(&container_name).await?;
533 self.wait_for_container_exit_inner(&container_name).await?;
534 self.cleanup_inner(&container_name).await?;
535
536 Ok(())
537 }
538
539 async fn create_container_inner(
540 &self, container_name: String, image: String, entrypoint: Option<Vec<String>>, cmd: Option<Vec<String>>,
541 mut binds: Vec<String>, env: Option<Vec<String>>,
542 ) -> Result<String, GenericError> {
543 binds.push(format!("{}:/airlock:z", self.isolation_group_name));
546
547 binds.push("/proc:/host/proc:ro".to_string());
550 binds.push("/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string());
551 binds.push("/var/run/docker.sock:/var/run/docker.sock:ro".to_string());
552
553 let (publish_all_ports, exposed_ports) = if self.config.exposed_ports.is_empty() {
554 (None, None)
555 } else {
556 let mut exposed_ports = HashMap::new();
557 for (protocol, internal_port) in &self.config.exposed_ports {
558 exposed_ports.insert(format!("{}/{}", internal_port, protocol), HashMap::new());
559 }
560
561 (Some(true), Some(exposed_ports))
562 };
563
564 let container_config = Config {
565 hostname: Some(self.config.driver_id.to_string()),
566 env,
567 image: Some(image),
568 entrypoint,
569 cmd,
570 host_config: Some(HostConfig {
571 binds: Some(binds),
572 network_mode: Some(self.isolation_group_name.clone()),
573 publish_all_ports,
574 pid_mode: Some("host".to_string()),
575 ..Default::default()
576 }),
577 healthcheck: self.config.healthcheck.clone(),
578 exposed_ports,
579 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
580 ..Default::default()
581 };
582
583 let create_options = CreateContainerOptions {
584 name: container_name,
585 ..Default::default()
586 };
587
588 let response = self
589 .docker
590 .create_container(Some(create_options), container_config)
591 .await?;
592
593 Ok(response.id)
594 }
595
596 async fn create_container(&self) -> Result<(), GenericError> {
597 debug!(
598 driver_id = self.config.driver_id,
599 isolation_group = self.isolation_group_id,
600 "Creating container '{}'...",
601 self.container_name
602 );
603
604 let container_id = self
605 .create_container_inner(
606 self.container_name.clone(),
607 self.config.image.clone(),
608 self.config.entrypoint.clone(),
609 self.config.command.clone(),
610 self.config.binds.clone(),
611 Some(self.config.env.clone()),
612 )
613 .await?;
614
615 debug!(
616 driver_id = self.config.driver_id,
617 isolation_group = self.isolation_group_id,
618 "Created container '{}' (ID: {}).",
619 self.container_name,
620 container_id
621 );
622
623 Ok(())
624 }
625
626 async fn start_container_inner(&self, container_name: &str) -> Result<DriverDetails, GenericError> {
627 self.docker.start_container::<String>(container_name, None).await?;
628
629 let mut details = DriverDetails {
630 container_name: container_name.to_string(),
631 ..Default::default()
632 };
633
634 let response = self.docker.inspect_container(container_name, None).await?;
635 if let Some(network_settings) = response.network_settings {
636 if let Some(ports) = network_settings.ports {
637 let port_mappings = details.port_mappings.get_or_insert_with(HashMap::new);
638 for (internal_port, bindings) in ports {
639 if let Some(bindings) = bindings {
640 for binding in bindings {
641 if let Some(host_ip) = binding.host_ip.as_deref() {
642 if host_ip == "0.0.0.0" {
643 let maybe_host_port =
644 binding.host_port.as_ref().and_then(|value| value.parse::<u16>().ok());
645
646 if let Some(host_port) = maybe_host_port {
647 port_mappings.insert(internal_port, host_port);
648 break;
649 }
650 }
651 }
652 }
653 }
654 }
655 }
656 }
657
658 Ok(details)
659 }
660
661 async fn start_container(&self) -> Result<DriverDetails, GenericError> {
662 debug!(
663 driver_id = self.config.driver_id,
664 isolation_group = self.isolation_group_id,
665 "Starting container '{}'...",
666 self.container_name
667 );
668
669 let details = self.start_container_inner(&self.container_name).await?;
670
671 if let Some(log_dir) = self.log_dir.clone() {
672 debug!(
673 "Capturing logs for container '{}' to {}...",
674 self.container_name,
675 log_dir.display()
676 );
677
678 self.capture_container_logs(log_dir, self.config.driver_id, &self.container_name)
679 .await?;
680 }
681
682 debug!(
683 driver_id = self.config.driver_id,
684 isolation_group = self.isolation_group_id,
685 "Started container '{}'.",
686 self.container_name
687 );
688
689 Ok(details)
690 }
691
692 pub async fn start(&mut self) -> Result<DriverDetails, GenericError> {
699 self.create_network_if_missing().await?;
700 self.create_image_if_missing().await?;
701 self.create_volume_if_missing().await?;
702 self.adjust_shared_volume_permissions().await?;
703
704 self.create_container().await?;
705 self.start_container().await
706 }
707
708 pub async fn wait_for_container_healthy(&mut self) -> Result<(), GenericError> {
716 loop {
717 let response = self.docker.inspect_container(&self.container_name, None).await?;
719 let state = response
720 .state
721 .ok_or_else(|| generic_error!("Container state should be present."))?;
722
723 let status = state
725 .status
726 .ok_or_else(|| generic_error!("Container status should be present."))?;
727 if status != ContainerStateStatusEnum::RUNNING {
728 return Err(generic_error!("Container exited unexpectedly."));
729 }
730
731 if let Some(health_status) = state.health.and_then(|h| h.status) {
732 match health_status {
733 HealthStatusEnum::EMPTY | HealthStatusEnum::NONE | HealthStatusEnum::HEALTHY => {
735 debug!(
736 driver_id = self.config.driver_id,
737 "Container '{}' healthy or no healthcheck defined. Proceeding.", &self.container_name
738 );
739 return Ok(());
740 }
741
742 HealthStatusEnum::STARTING | HealthStatusEnum::UNHEALTHY => {
744 debug!(
745 driver_id = self.config.driver_id,
746 "Container '{}' not yet healthy. Waiting...", &self.container_name
747 );
748 }
749 }
750 } else {
751 debug!(
752 driver_id = self.config.driver_id,
753 "Container '{}' has no healthcheck defined. Proceeding.", &self.container_name
754 );
755 return Ok(());
756 }
757
758 sleep(Duration::from_secs(1)).await;
760 }
761 }
762
763 async fn wait_for_container_exit_inner(&self, container_name: &str) -> Result<ExitStatus, GenericError> {
764 let mut wait_stream = self.docker.wait_container::<String>(container_name, None);
765 match wait_stream.next().await {
766 Some(result) => match result {
767 Ok(response) => {
768 assert_eq!(response.error, None);
776 assert_eq!(response.status_code, 0);
777
778 Ok(ExitStatus::Success)
779 }
780
781 Err(Error::DockerContainerWaitError { error, code }) => {
782 let error = if error.is_empty() {
783 String::from("<no error message provided>")
784 } else {
785 error
786 };
787 Ok(ExitStatus::Failed { code, error })
788 }
789
790 Err(e) => Err(generic_error!("Failed to wait for container to finish: {:?}", e)),
791 },
792 None => unreachable!("Docker wait stream ended unexpectedly."),
793 }
794 }
795
796 pub async fn wait_for_container_exit(&self) -> Result<ExitStatus, GenericError> {
805 debug!(
806 driver_id = self.config.driver_id,
807 isolation_group = self.isolation_group_id,
808 "Waiting for container '{}' to finish...",
809 &self.container_name
810 );
811
812 let exit_status = self.wait_for_container_exit_inner(&self.container_name).await?;
813
814 debug!(
815 driver_id = self.config.driver_id,
816 isolation_group = self.isolation_group_id,
817 "Container '{}' finished successfully.",
818 &self.container_name
819 );
820
821 Ok(exit_status)
822 }
823
824 async fn cleanup_inner(&self, container_name: &str) -> Result<(), GenericError> {
825 self.docker.stop_container(container_name, None).await?;
826 self.docker.remove_container(container_name, None).await?;
827
828 Ok(())
829 }
830
831 pub async fn cleanup(self) -> Result<(), GenericError> {
837 debug!(
838 driver_id = self.config.driver_id,
839 isolation_group = self.isolation_group_id,
840 "Cleaning up container '{}'...",
841 self.container_name
842 );
843
844 let start = Instant::now();
845
846 self.cleanup_inner(&self.container_name).await?;
847
848 debug!(
849 driver_id = self.config.driver_id,
850 isolation_group = self.isolation_group_id,
851 "Container '{}' removed after {:?}.",
852 self.container_name,
853 start.elapsed()
854 );
855
856 Ok(())
857 }
858
859 async fn capture_container_logs(
860 &self, container_log_dir: PathBuf, log_name: &str, container_name: &str,
861 ) -> Result<(), GenericError> {
862 tokio::fs::create_dir_all(&container_log_dir)
865 .await
866 .error_context("Failed to create logs directory. Possible permissions issue.")?;
867
868 let stdout_log_path = container_log_dir.join(format!("{}.stdout.log", log_name));
869 let stderr_log_path = container_log_dir.join(format!("{}.stderr.log", log_name));
870
871 let mut stdout_file = tokio::fs::File::create(&stdout_log_path)
872 .await
873 .map(BufWriter::new)
874 .error_context("Failed to create standard output log file. Possible permissions issue.")?;
875 let mut stderr_file = tokio::fs::File::create(&stderr_log_path)
876 .await
877 .map(BufWriter::new)
878 .error_context("Failed to create standard error log file. Possible permissions issue.")?;
879
880 let logs_config = LogsOptions {
882 follow: true,
883 stdout: true,
884 stderr: true,
885 ..Default::default()
886 };
887 let mut log_stream = self.docker.logs::<String>(container_name, Some(logs_config));
888
889 tokio::spawn(async move {
890 while let Some(log_result) = log_stream.next().await {
891 match log_result {
892 Ok(log) => match log {
893 LogOutput::StdErr { message } => {
894 if let Err(e) = stderr_file.write_all(&message[..]).await {
895 error!(error = %e, "Failed to write log line to standard error log file.");
896 break;
897 }
898 if let Err(e) = stderr_file.flush().await {
899 error!(error = %e, "Failed to flush standard error log file.");
900 break;
901 }
902 }
903 LogOutput::StdOut { message } => {
904 if let Err(e) = stdout_file.write_all(&message[..]).await {
905 error!(error = %e, "Failed to write log line to standard output log file.");
906 break;
907 }
908 if let Err(e) = stdout_file.flush().await {
909 error!(error = %e, "Failed to flush standard output log file.");
910 break;
911 }
912 }
913 LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
914 },
915 Err(e) => {
916 error!(error = %e, "Failed to read log line from container.");
917 break;
918 }
919 }
920 }
921
922 if let Err(e) = stdout_file.get_mut().sync_all().await {
924 error!(error = %e, "Failed to fsync standard output log file.");
925 }
926
927 if let Err(e) = stderr_file.get_mut().sync_all().await {
928 error!(error = %e, "Failed to fsync standard error log file.");
929 }
930 });
931
932 Ok(())
933 }
934}
935
936fn get_alpine_container_image() -> String {
937 std::env::var("GROUND_TRUTH_ALPINE_IMAGE").unwrap_or_else(|_| "alpine:latest".to_string())
944}
945
946fn get_default_airlock_labels(isolation_group_id: &str) -> HashMap<String, String> {
947 let mut labels = HashMap::new();
948 labels.insert("created_by".to_string(), "airlock".to_string());
949 labels.insert("airlock-isolation-group".to_string(), isolation_group_id.to_string());
950 labels
951}