1use std::{
2 collections::HashMap,
3 fmt,
4 path::{Path, PathBuf},
5 time::{Duration, Instant},
6};
7
8use bollard::{
9 container::LogOutput,
10 errors::Error,
11 exec::{CreateExecOptions, StartExecResults},
12 models::{
13 ContainerCreateBody, ContainerStateStatusEnum, HealthConfig, HealthStatusEnum, HostConfig, Ipam,
14 NetworkCreateRequest, VolumeCreateRequest,
15 },
16 query_parameters::{CreateContainerOptionsBuilder, CreateImageOptions, ListContainersOptionsBuilder, LogsOptions},
17 Docker,
18};
19use futures::{StreamExt as _, TryStreamExt as _};
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use tokio::{
22 io::{AsyncWriteExt as _, BufWriter},
23 time::sleep,
24};
25use tracing::{debug, error, trace};
26
27use crate::config::{DatadogIntakeConfig, MillstoneConfig, TargetConfig};
28
29const MILLSTONE_CONFIG_PATH_INTERNAL: &str = "/etc/millstone/config.toml";
30const DATADOG_INTAKE_CONFIG_PATH_INTERNAL: &str = "/etc/datadog-intake/config.toml";
31
32pub enum ExitStatus {
33 Success,
34 Failed { code: i64, error: String },
35}
36
37impl fmt::Display for ExitStatus {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 ExitStatus::Success => write!(f, "success (0)"),
41 ExitStatus::Failed { code, error } => write!(f, "failed (exit code: {}, error: {})", code, error),
42 }
43 }
44}
45
46#[derive(Clone)]
50pub struct DriverConfig {
51 driver_id: &'static str,
52 image: String,
53 entrypoint: Option<Vec<String>>,
54 command: Option<Vec<String>>,
55 env: Vec<String>,
56 binds: Vec<String>,
57 healthcheck: Option<HealthConfig>,
58 exposed_ports: Vec<(&'static str, u16)>,
59}
60
61impl DriverConfig {
62 pub async fn millstone(config: MillstoneConfig) -> Result<Self, GenericError> {
63 match tokio::fs::metadata(&config.config_path).await {
65 Ok(metadata) if metadata.is_file() => {}
66 Ok(_) => {
67 return Err(generic_error!(
68 "Specified millstone configuration path ({}) does not point to a file.",
69 config.config_path.display()
70 ))
71 }
72 Err(e) => {
73 return Err(generic_error!(
74 "Failed to ensure specified millstone configuration ({}) exists locally: {}",
75 config.config_path.display(),
76 e
77 ))
78 }
79 }
80
81 let millstone_binary_path = config
82 .binary_path
83 .unwrap_or_else(|| "/usr/local/bin/millstone".to_string());
84 let entrypoint = vec![millstone_binary_path, MILLSTONE_CONFIG_PATH_INTERNAL.to_string()];
85
86 let driver_config = Self::from_image("millstone", config.image)
87 .with_entrypoint(entrypoint)
88 .with_bind_mount(config.config_path, MILLSTONE_CONFIG_PATH_INTERNAL);
89
90 Ok(driver_config)
91 }
92
93 pub async fn datadog_intake(config: DatadogIntakeConfig) -> Result<Self, GenericError> {
94 match tokio::fs::metadata(&config.config_path).await {
96 Ok(metadata) if metadata.is_file() => {}
97 Ok(_) => {
98 return Err(generic_error!(
99 "Specified datadog-intake configuration ({}) does not point to a file.",
100 config.config_path.display()
101 ))
102 }
103 Err(e) => {
104 return Err(generic_error!(
105 "Failed to ensure specified datadog-intake configuration ({}) exists locally: {}",
106 config.config_path.display(),
107 e
108 ))
109 }
110 }
111
112 let datadog_intake_binary_path = config
113 .binary_path
114 .unwrap_or_else(|| "/usr/local/bin/datadog-intake".to_string());
115 let entrypoint = vec![
116 datadog_intake_binary_path,
117 DATADOG_INTAKE_CONFIG_PATH_INTERNAL.to_string(),
118 ];
119
120 let driver_config = DriverConfig::from_image("datadog-intake", config.image)
121 .with_entrypoint(entrypoint)
122 .with_bind_mount(config.config_path, DATADOG_INTAKE_CONFIG_PATH_INTERNAL)
123 .with_exposed_port("tcp", 2049);
126
127 Ok(driver_config)
128 }
129
130 pub async fn target(target_id: &'static str, config: TargetConfig) -> Result<Self, GenericError> {
131 let driver_config = DriverConfig::from_image(target_id, config.image)
132 .with_entrypoint(config.entrypoint)
133 .with_command(config.command)
134 .with_env_vars(config.additional_env_vars);
135
136 Ok(driver_config)
137 }
138
139 fn from_image(driver_id: &'static str, image: String) -> Self {
141 Self {
142 driver_id,
143 image,
144 entrypoint: None,
145 command: None,
146 env: vec![],
147 binds: vec![],
148 healthcheck: None,
149 exposed_ports: vec![],
150 }
151 }
152
153 pub fn with_entrypoint(mut self, entrypoint: Vec<String>) -> Self {
157 if !entrypoint.is_empty() {
158 self.entrypoint = Some(entrypoint);
159 }
160 self
161 }
162
163 pub fn with_command(mut self, command: Vec<String>) -> Self {
167 if !command.is_empty() {
168 self.command = Some(command);
169 }
170 self
171 }
172
173 pub fn with_env_var<K, V>(mut self, key: K, value: V) -> Self
175 where
176 K: AsRef<str>,
177 V: AsRef<str>,
178 {
179 self.env.push(format!("{}={}", key.as_ref(), value.as_ref()));
180 self
181 }
182
183 pub fn with_env_vars(mut self, env: Vec<String>) -> Self {
185 self.env.extend(env);
186 self
187 }
188
189 pub fn with_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
194 where
195 HP: AsRef<Path>,
196 CP: AsRef<Path>,
197 {
198 let bind_mount = format!("{}:{}", host_path.as_ref().display(), container_path.as_ref().display());
199 self.binds.push(bind_mount);
200 self
201 }
202
203 pub fn with_readonly_bind_mount<HP, CP>(mut self, host_path: HP, container_path: CP) -> Self
207 where
208 HP: AsRef<Path>,
209 CP: AsRef<Path>,
210 {
211 let bind_mount = format!(
212 "{}:{}:ro",
213 host_path.as_ref().display(),
214 container_path.as_ref().display()
215 );
216 self.binds.push(bind_mount);
217 self
218 }
219
220 pub fn with_healthcheck(
222 mut self, mut test_command: Vec<String>, interval: Duration, timeout: Duration, retries: i64,
223 start_period: Duration, start_interval: Duration,
224 ) -> Self {
225 test_command.insert(0, "CMD".to_string());
228
229 self.healthcheck = Some(HealthConfig {
230 test: Some(test_command),
231 interval: Some(interval.as_nanos() as i64),
232 timeout: Some(timeout.as_nanos() as i64),
233 retries: Some(retries),
234 start_period: Some(start_period.as_nanos() as i64),
235 start_interval: Some(start_interval.as_nanos() as i64),
236 });
237 self
238 }
239
240 pub fn with_exposed_port(mut self, protocol: &'static str, internal_port: u16) -> Self {
249 self.exposed_ports.push((protocol, internal_port));
250 self
251 }
252}
253
254#[derive(Debug, Default)]
256pub struct DriverDetails {
257 container_name: String,
258 port_mappings: Option<HashMap<String, u16>>,
259}
260
261impl DriverDetails {
262 pub fn container_name(&self) -> &str {
264 &self.container_name
265 }
266
267 pub fn try_get_exposed_port(&self, protocol: &str, internal_port: u16) -> Option<u16> {
273 self.port_mappings
274 .as_ref()
275 .and_then(|port_mappings| port_mappings.get(&format!("{}/{}", internal_port, protocol)).copied())
276 }
277}
278
279pub struct Driver {
281 isolation_group_id: String,
282 isolation_group_name: String,
283 container_name: String,
284 config: DriverConfig,
285 docker: Docker,
286 log_dir: Option<PathBuf>,
287}
288
289impl Driver {
290 pub fn from_config(isolation_group_id: String, config: DriverConfig) -> Result<Self, GenericError> {
310 let docker = crate::docker::connect()?;
311
312 Ok(Self {
313 isolation_group_name: format!("airlock-{}", isolation_group_id),
314 container_name: format!("airlock-{}-{}", isolation_group_id, config.driver_id),
315 isolation_group_id,
316 config,
317 docker,
318 log_dir: None,
319 })
320 }
321
322 pub fn with_logging(mut self, log_dir: PathBuf) -> Self {
328 self.log_dir = Some(log_dir);
329 self
330 }
331
332 pub fn driver_id(&self) -> &'static str {
336 self.config.driver_id
337 }
338
339 pub async fn clean_related_resources(isolation_group_id: String) -> Result<(), GenericError> {
348 let docker = crate::docker::connect()?;
349
350 let isolation_group_name = format!("airlock-{}", isolation_group_id);
351 let isolation_group_label = format!("airlock-isolation-group={}", isolation_group_id);
352
353 let list_filters: HashMap<&str, Vec<&str>> =
355 [("label", vec!["created_by=airlock", isolation_group_label.as_str()])]
356 .into_iter()
357 .collect();
358 let list_options = Some(
359 ListContainersOptionsBuilder::default()
360 .all(true)
361 .filters(&list_filters)
362 .build(),
363 );
364 let containers = docker.list_containers(list_options).await.with_error_context(|| {
365 format!(
366 "Failed to list containers attached to isolation group '{}'.",
367 isolation_group_id
368 )
369 })?;
370
371 for container in containers {
372 let container_name = match container.id {
373 Some(id) => id,
374 None => {
375 debug!("Listed container had no ID. Skipping removal.");
376 continue;
377 }
378 };
379
380 if let Err(e) = docker.stop_container(container_name.as_str(), None).await {
381 error!(error = %e, "Failed to stop container '{}'.", container_name);
382 continue;
383 } else {
384 debug!("Stopped container '{}'.", container_name);
385 }
386
387 if let Err(e) = docker.remove_container(container_name.as_str(), None).await {
388 error!(error = %e, "Failed to remove container '{}'.", container_name);
389 continue;
390 } else {
391 debug!("Removed container '{}'.", container_name);
392 }
393 }
394
395 if let Err(e) = docker
397 .remove_volume(
398 isolation_group_name.as_str(),
399 None::<bollard::query_parameters::RemoveVolumeOptions>,
400 )
401 .await
402 {
403 error!(error = %e, "Failed to remove shared volume '{}'.", isolation_group_name);
404 } else {
405 debug!("Removed shared volume '{}'.", isolation_group_name);
406 }
407
408 if let Err(e) = docker.remove_network(isolation_group_name.as_str()).await {
410 error!(error = %e, "Failed to remove shared network '{}'.", isolation_group_name);
411 } else {
412 debug!("Removed shared network '{}'.", isolation_group_name);
413 }
414
415 Ok(())
416 }
417
418 async fn create_network_if_missing(&self) -> Result<(), GenericError> {
419 let networks = self.docker.list_networks(None).await?;
421 if networks
422 .iter()
423 .any(|network| network.name.as_deref() == Some(self.isolation_group_name.as_str()))
424 {
425 debug!("Network '{}' already exists.", self.isolation_group_name);
426 return Ok(());
427 }
428
429 debug!(
430 driver_id = self.config.driver_id,
431 isolation_group = self.isolation_group_id,
432 "Network '{}' does not exist. Creating...",
433 self.isolation_group_name
434 );
435
436 let network_options = NetworkCreateRequest {
438 name: self.isolation_group_name.clone(),
439 driver: Some("bridge".to_string()),
440 ipam: Some(Ipam::default()),
441 enable_ipv6: Some(false),
442 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
443 ..Default::default()
444 };
445 let response = self.docker.create_network(network_options).await?;
446 debug!(
447 driver_id = self.config.driver_id,
448 isolation_group = self.isolation_group_id,
449 "Created network '{}' (ID: {:?}).",
450 self.isolation_group_name,
451 response.id
452 );
453
454 Ok(())
455 }
456
457 async fn create_image_if_missing_inner(&self, image: &str) -> Result<(), GenericError> {
458 let image_options = CreateImageOptions {
459 from_image: Some(image.to_string()),
460 ..Default::default()
461 };
462
463 let mut create_stream = self.docker.create_image(Some(image_options), None, None);
464 while let Some(info) = create_stream.next().await {
465 trace!(
466 driver_id = self.config.driver_id,
467 isolation_group = self.isolation_group_id,
468 image,
469 "Received image pull update: {:?}",
470 info
471 );
472 }
473
474 Ok(())
475 }
476
477 async fn create_image_if_missing(&self) -> Result<(), GenericError> {
478 debug!(
479 driver_id = self.config.driver_id,
480 isolation_group = self.isolation_group_id,
481 "Pulling image '{}'...",
482 self.config.image
483 );
484
485 self.create_image_if_missing_inner(self.config.image.as_str()).await?;
486
487 debug!(
488 driver_id = self.config.driver_id,
489 isolation_group = self.isolation_group_id,
490 "Pulled image '{}'.",
491 self.config.image
492 );
493
494 Ok(())
495 }
496
497 async fn create_volume_if_missing(&self) -> Result<(), GenericError> {
498 let volumes = self
500 .docker
501 .list_volumes(None::<bollard::query_parameters::ListVolumesOptions>)
502 .await?;
503 if volumes
504 .volumes
505 .iter()
506 .flatten()
507 .any(|volume| volume.name == self.isolation_group_name.as_str())
508 {
509 debug!("Shared volume '{}' already exists.", self.isolation_group_name);
510 return Ok(());
511 }
512
513 debug!(
514 driver_id = self.config.driver_id,
515 isolation_group = self.isolation_group_id,
516 "Shared volume '{}' does not exist. Creating...",
517 self.isolation_group_name
518 );
519
520 let volume_options = VolumeCreateRequest {
521 name: Some(self.isolation_group_name.clone()),
522 driver: Some("local".to_string()),
523 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
524 ..Default::default()
525 };
526 self.docker.create_volume(volume_options).await?;
527
528 debug!(
529 driver_id = self.config.driver_id,
530 isolation_group = self.isolation_group_id,
531 "Created shared volume '{}'.",
532 self.isolation_group_name
533 );
534
535 Ok(())
536 }
537
538 async fn adjust_shared_volume_permissions(&self) -> Result<(), GenericError> {
539 debug!(
540 driver_id = self.config.driver_id,
541 isolation_group = self.isolation_group_id,
542 "Adjusting permissions on shared volume '{}'...",
543 self.container_name
544 );
545
546 let image = get_alpine_container_image();
548 self.create_image_if_missing_inner(&image).await?;
549
550 let container_name = format!("airlock-{}-volume-fix-up", self.isolation_group_id);
551 let entrypoint = vec![
552 "chmod".to_string(),
553 "-R".to_string(),
554 "777".to_string(),
555 "/airlock".to_string(),
556 ];
557 let _ = self
558 .create_container_inner(container_name.clone(), image, Some(entrypoint), None, vec![], None)
559 .await?;
560
561 self.start_container_inner(&container_name).await?;
562 self.wait_for_container_exit_inner(&container_name).await?;
563 self.cleanup_inner(&container_name).await?;
564
565 Ok(())
566 }
567
568 async fn create_container_inner(
569 &self, container_name: String, image: String, entrypoint: Option<Vec<String>>, cmd: Option<Vec<String>>,
570 mut binds: Vec<String>, env: Option<Vec<String>>,
571 ) -> Result<String, GenericError> {
572 binds.push(format!("{}:/airlock:z", self.isolation_group_name));
575
576 binds.push("/proc:/host/proc:ro".to_string());
579 binds.push("/sys/fs/cgroup:/host/sys/fs/cgroup:ro".to_string());
580 binds.push("/var/run/docker.sock:/var/run/docker.sock:ro".to_string());
581
582 let (publish_all_ports, exposed_ports) = if self.config.exposed_ports.is_empty() {
583 (None, None)
584 } else {
585 let exposed_ports: Vec<String> = self
586 .config
587 .exposed_ports
588 .iter()
589 .map(|(protocol, internal_port)| format!("{}/{}", internal_port, protocol))
590 .collect();
591 (Some(true), Some(exposed_ports))
592 };
593
594 let container_config = ContainerCreateBody {
595 hostname: Some(self.config.driver_id.to_string()),
596 env,
597 image: Some(image),
598 entrypoint,
599 cmd,
600 host_config: Some(HostConfig {
601 binds: Some(binds),
602 network_mode: Some(self.isolation_group_name.clone()),
603 publish_all_ports,
604 pid_mode: Some("host".to_string()),
605 ..Default::default()
606 }),
607 healthcheck: self.config.healthcheck.clone(),
608 exposed_ports,
609 labels: Some(get_default_airlock_labels(self.isolation_group_id.as_str())),
610 ..Default::default()
611 };
612
613 let create_options = CreateContainerOptionsBuilder::default().name(&container_name).build();
614
615 let response = self
616 .docker
617 .create_container(Some(create_options), container_config)
618 .await?;
619
620 Ok(response.id)
621 }
622
623 async fn create_container(&self) -> Result<(), GenericError> {
624 debug!(
625 driver_id = self.config.driver_id,
626 isolation_group = self.isolation_group_id,
627 "Creating container '{}'...",
628 self.container_name
629 );
630
631 let container_id = self
632 .create_container_inner(
633 self.container_name.clone(),
634 self.config.image.clone(),
635 self.config.entrypoint.clone(),
636 self.config.command.clone(),
637 self.config.binds.clone(),
638 Some(self.config.env.clone()),
639 )
640 .await?;
641
642 debug!(
643 driver_id = self.config.driver_id,
644 isolation_group = self.isolation_group_id,
645 "Created container '{}' (ID: {}).",
646 self.container_name,
647 container_id
648 );
649
650 Ok(())
651 }
652
653 async fn start_container_inner(&self, container_name: &str) -> Result<DriverDetails, GenericError> {
654 self.docker.start_container(container_name, None).await?;
655
656 let mut details = DriverDetails {
657 container_name: container_name.to_string(),
658 ..Default::default()
659 };
660
661 let response = self.docker.inspect_container(container_name, None).await?;
662 if let Some(network_settings) = response.network_settings {
663 if let Some(ports) = network_settings.ports {
664 let port_mappings = details.port_mappings.get_or_insert_with(HashMap::new);
665 for (internal_port, bindings) in ports {
666 if let Some(bindings) = bindings {
667 for binding in bindings {
668 if let Some(host_ip) = binding.host_ip.as_deref() {
669 if host_ip == "0.0.0.0" {
670 let maybe_host_port =
671 binding.host_port.as_ref().and_then(|value| value.parse::<u16>().ok());
672
673 if let Some(host_port) = maybe_host_port {
674 port_mappings.insert(internal_port, host_port);
675 break;
676 }
677 }
678 }
679 }
680 }
681 }
682 }
683 }
684
685 Ok(details)
686 }
687
688 async fn start_container(&self) -> Result<DriverDetails, GenericError> {
689 debug!(
690 driver_id = self.config.driver_id,
691 isolation_group = self.isolation_group_id,
692 "Starting container '{}'...",
693 self.container_name
694 );
695
696 let details = self.start_container_inner(&self.container_name).await?;
697
698 if let Some(log_dir) = self.log_dir.clone() {
699 debug!(
700 "Capturing logs for container '{}' to {}...",
701 self.container_name,
702 log_dir.display()
703 );
704
705 self.capture_container_logs(log_dir, self.config.driver_id, &self.container_name)
706 .await?;
707 }
708
709 debug!(
710 driver_id = self.config.driver_id,
711 isolation_group = self.isolation_group_id,
712 "Started container '{}'.",
713 self.container_name
714 );
715
716 Ok(details)
717 }
718
719 pub async fn start(&mut self) -> Result<DriverDetails, GenericError> {
726 self.create_network_if_missing().await?;
727 self.create_image_if_missing().await?;
728 self.create_volume_if_missing().await?;
729 self.adjust_shared_volume_permissions().await?;
730
731 self.create_container().await?;
732 self.start_container().await
733 }
734
735 pub async fn wait_for_container_healthy(&mut self) -> Result<(), GenericError> {
743 loop {
744 let response = self.docker.inspect_container(&self.container_name, None).await?;
746 let state = response
747 .state
748 .ok_or_else(|| generic_error!("Container state should be present."))?;
749
750 let status = state
752 .status
753 .ok_or_else(|| generic_error!("Container status should be present."))?;
754 if status != ContainerStateStatusEnum::RUNNING {
755 return Err(generic_error!(
756 "Container exited unexpectedly (driver_id: {}, container: {}). Check logs in the test run directory.",
757 self.config.driver_id,
758 self.container_name
759 ));
760 }
761
762 if let Some(health_status) = state.health.and_then(|h| h.status) {
763 match health_status {
764 HealthStatusEnum::EMPTY | HealthStatusEnum::NONE | HealthStatusEnum::HEALTHY => {
766 debug!(
767 driver_id = self.config.driver_id,
768 "Container '{}' healthy or no healthcheck defined. Proceeding.", &self.container_name
769 );
770 return Ok(());
771 }
772
773 HealthStatusEnum::STARTING | HealthStatusEnum::UNHEALTHY => {
775 debug!(
776 driver_id = self.config.driver_id,
777 "Container '{}' not yet healthy. Waiting...", &self.container_name
778 );
779 }
780 }
781 } else {
782 debug!(
783 driver_id = self.config.driver_id,
784 "Container '{}' has no healthcheck defined. Proceeding.", &self.container_name
785 );
786 return Ok(());
787 }
788
789 sleep(Duration::from_secs(1)).await;
791 }
792 }
793
794 async fn wait_for_container_exit_inner(&self, container_name: &str) -> Result<ExitStatus, GenericError> {
795 let mut wait_stream = self.docker.wait_container(container_name, None);
796 match wait_stream.next().await {
797 Some(result) => match result {
798 Ok(response) => {
799 assert_eq!(response.error, None);
807 assert_eq!(response.status_code, 0);
808
809 Ok(ExitStatus::Success)
810 }
811
812 Err(Error::DockerContainerWaitError { error, code }) => {
813 let error = if error.is_empty() {
814 String::from("<no error message provided>")
815 } else {
816 error
817 };
818 Ok(ExitStatus::Failed { code, error })
819 }
820
821 Err(e) => Err(generic_error!("Failed to wait for container to finish: {:?}", e)),
822 },
823 None => unreachable!("Docker wait stream ended unexpectedly."),
824 }
825 }
826
827 pub async fn wait_for_container_exit(&self) -> Result<ExitStatus, GenericError> {
836 debug!(
837 driver_id = self.config.driver_id,
838 isolation_group = self.isolation_group_id,
839 "Waiting for container '{}' to finish...",
840 &self.container_name
841 );
842
843 let exit_status = self.wait_for_container_exit_inner(&self.container_name).await?;
844
845 debug!(
846 driver_id = self.config.driver_id,
847 isolation_group = self.isolation_group_id,
848 "Container '{}' finished successfully.",
849 &self.container_name
850 );
851
852 Ok(exit_status)
853 }
854
855 pub async fn exec_in_container(&self, cmd: Vec<String>) -> Result<String, GenericError> {
864 let exec_opts = CreateExecOptions {
865 attach_stdout: Some(true),
866 attach_stderr: Some(false),
867 cmd: Some(cmd.clone()),
868 ..Default::default()
869 };
870
871 let exec = self
872 .docker
873 .create_exec(&self.container_name, exec_opts)
874 .await
875 .with_error_context(|| format!("Failed to create exec instance for container {}.", self.container_name))?;
876
877 let exec_id = exec.id.clone();
878
879 let output = self
880 .docker
881 .start_exec(&exec.id, None)
882 .await
883 .with_error_context(|| format!("Failed to start exec for container {}.", self.container_name))?;
884
885 let mut stdout = String::new();
886 if let StartExecResults::Attached { mut output, .. } = output {
887 while let Some(chunk) = output.try_next().await? {
888 if let LogOutput::StdOut { message } = chunk {
889 stdout.push_str(&String::from_utf8_lossy(&message));
890 }
891 }
892 }
893
894 let inspect = self
896 .docker
897 .inspect_exec(&exec_id)
898 .await
899 .error_context("Failed to inspect exec result.")?;
900
901 if let Some(code) = inspect.exit_code {
902 if code != 0 {
903 return Err(generic_error!(
904 "Command {:?} exited with code {} in container {}.",
905 cmd,
906 code,
907 self.container_name
908 ));
909 }
910 }
911
912 Ok(stdout)
913 }
914
915 async fn cleanup_inner(&self, container_name: &str) -> Result<(), GenericError> {
916 self.docker.stop_container(container_name, None).await?;
917 self.docker.remove_container(container_name, None).await?;
918
919 Ok(())
920 }
921
922 pub async fn cleanup(self) -> Result<(), GenericError> {
928 debug!(
929 driver_id = self.config.driver_id,
930 isolation_group = self.isolation_group_id,
931 "Cleaning up container '{}'...",
932 self.container_name
933 );
934
935 let start = Instant::now();
936
937 self.cleanup_inner(&self.container_name).await?;
938
939 debug!(
940 driver_id = self.config.driver_id,
941 isolation_group = self.isolation_group_id,
942 "Container '{}' removed after {:?}.",
943 self.container_name,
944 start.elapsed()
945 );
946
947 Ok(())
948 }
949
950 async fn capture_container_logs(
951 &self, container_log_dir: PathBuf, log_name: &str, container_name: &str,
952 ) -> Result<(), GenericError> {
953 tokio::fs::create_dir_all(&container_log_dir)
956 .await
957 .error_context("Failed to create logs directory. Possible permissions issue.")?;
958
959 let stdout_log_path = container_log_dir.join(format!("{}.stdout.log", log_name));
960 let stderr_log_path = container_log_dir.join(format!("{}.stderr.log", log_name));
961
962 let mut stdout_file = tokio::fs::File::create(&stdout_log_path)
963 .await
964 .map(BufWriter::new)
965 .error_context("Failed to create standard output log file. Possible permissions issue.")?;
966 let mut stderr_file = tokio::fs::File::create(&stderr_log_path)
967 .await
968 .map(BufWriter::new)
969 .error_context("Failed to create standard error log file. Possible permissions issue.")?;
970
971 let logs_config = LogsOptions {
973 follow: true,
974 stdout: true,
975 stderr: true,
976 ..Default::default()
977 };
978 let mut log_stream = self.docker.logs(container_name, Some(logs_config));
979
980 tokio::spawn(async move {
981 while let Some(log_result) = log_stream.next().await {
982 match log_result {
983 Ok(log) => match log {
984 LogOutput::StdErr { message } => {
985 if let Err(e) = stderr_file.write_all(&message[..]).await {
986 error!(error = %e, "Failed to write log line to standard error log file.");
987 break;
988 }
989 if let Err(e) = stderr_file.flush().await {
990 error!(error = %e, "Failed to flush standard error log file.");
991 break;
992 }
993 }
994 LogOutput::StdOut { message } => {
995 if let Err(e) = stdout_file.write_all(&message[..]).await {
996 error!(error = %e, "Failed to write log line to standard output log file.");
997 break;
998 }
999 if let Err(e) = stdout_file.flush().await {
1000 error!(error = %e, "Failed to flush standard output log file.");
1001 break;
1002 }
1003 }
1004 LogOutput::StdIn { .. } | LogOutput::Console { .. } => {}
1005 },
1006 Err(e) => {
1007 error!(error = %e, "Failed to read log line from container.");
1008 break;
1009 }
1010 }
1011 }
1012
1013 if let Err(e) = stdout_file.get_mut().sync_all().await {
1015 error!(error = %e, "Failed to fsync standard output log file.");
1016 }
1017
1018 if let Err(e) = stderr_file.get_mut().sync_all().await {
1019 error!(error = %e, "Failed to fsync standard error log file.");
1020 }
1021 });
1022
1023 Ok(())
1024 }
1025}
1026
1027fn get_alpine_container_image() -> String {
1028 std::env::var("PANORAMIC_ALPINE_IMAGE").unwrap_or_else(|_| "alpine:latest".to_string())
1035}
1036
1037fn get_default_airlock_labels(isolation_group_id: &str) -> HashMap<String, String> {
1038 let mut labels = HashMap::new();
1039 labels.insert("created_by".to_string(), "airlock".to_string());
1040 labels.insert("airlock-isolation-group".to_string(), isolation_group_id.to_string());
1041 labels
1042}