saluki_env/workload/collectors/
mod.rs1use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use saluki_common::sync::shutdown::ShutdownHandle;
7use saluki_core::runtime::{InitializationError, Supervisable, SupervisorFuture};
8use saluki_error::GenericError;
9use tokio::{
10 select,
11 sync::{mpsc, Mutex},
12 time::sleep,
13};
14use tracing::{debug, warn};
15
16use super::metadata::MetadataOperation;
17
18#[cfg(target_os = "linux")]
19mod cgroups;
20#[cfg(target_os = "linux")]
21pub use self::cgroups::CgroupsMetadataCollector;
22
23#[cfg(unix)]
24mod containerd;
25#[cfg(unix)]
26pub use self::containerd::ContainerdMetadataCollector;
27
28#[async_trait]
34pub trait MetadataCollector {
35 fn name(&self) -> &'static str;
37
38 async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError>;
40}
41
42pub struct MetadataCollectorWorker {
44 name: &'static str,
45 state: Arc<Mutex<MetadataCollectorState>>,
46}
47
48struct MetadataCollectorState {
49 collector: Box<dyn MetadataCollector + Send>,
50 operations_tx: mpsc::Sender<MetadataOperation>,
51}
52
53impl MetadataCollectorWorker {
54 pub fn new<MC>(collector: MC, operations_tx: mpsc::Sender<MetadataOperation>) -> Self
56 where
57 MC: MetadataCollector + Send + 'static,
58 {
59 let name = collector.name();
60 Self {
61 name,
62 state: Arc::new(Mutex::new(MetadataCollectorState {
63 collector: Box::new(collector),
64 operations_tx,
65 })),
66 }
67 }
68}
69
70#[async_trait]
71impl Supervisable for MetadataCollectorWorker {
72 fn name(&self) -> &str {
73 self.name
74 }
75
76 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
77 let state = Arc::clone(&self.state);
78
79 Ok(Box::pin(async move {
80 let mut state_guard = state.lock_owned().await;
81
82 select! {
83 _ = process_shutdown => Ok(()),
84 result = run_collector(&mut state_guard) => result,
85 }
86 }))
87 }
88}
89
90async fn run_collector(state: &mut MetadataCollectorState) -> Result<(), GenericError> {
91 debug!(
92 collector_name = state.collector.name(),
93 "Starting metadata collector worker."
94 );
95
96 let MetadataCollectorState {
97 collector,
98 operations_tx,
99 } = state;
100
101 let result = collector.watch(operations_tx).await;
102 if let Err(e) = &result {
103 warn!(
104 error = %e,
105 collector_name = collector.name(),
106 "Failed to collect metadata. Sleeping 2s before retrying...",
107 );
108 sleep(Duration::from_secs(2)).await;
109 }
110
111 debug!(collector_name = collector.name(), "Metadata collector worker stopped.");
112
113 result
114}