saluki_env/workload/collectors/
mod.rs1use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture};
7use saluki_error::GenericError;
8use tokio::{
9 select,
10 sync::{mpsc, Mutex},
11 time::sleep,
12};
13use tracing::{debug, warn};
14
15use super::metadata::MetadataOperation;
16
17#[cfg(target_os = "linux")]
18mod cgroups;
19#[cfg(target_os = "linux")]
20pub use self::cgroups::CgroupsMetadataCollector;
21
22mod containerd;
23pub use self::containerd::ContainerdMetadataCollector;
24
25#[async_trait]
31pub trait MetadataCollector {
32 fn name(&self) -> &'static str;
34
35 async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError>;
37}
38
39pub struct MetadataCollectorWorker {
41 name: &'static str,
42 state: Arc<Mutex<MetadataCollectorState>>,
43}
44
45struct MetadataCollectorState {
46 collector: Box<dyn MetadataCollector + Send>,
47 operations_tx: mpsc::Sender<MetadataOperation>,
48}
49
50impl MetadataCollectorWorker {
51 pub fn new<MC>(collector: MC, operations_tx: mpsc::Sender<MetadataOperation>) -> Self
53 where
54 MC: MetadataCollector + Send + 'static,
55 {
56 let name = collector.name();
57 Self {
58 name,
59 state: Arc::new(Mutex::new(MetadataCollectorState {
60 collector: Box::new(collector),
61 operations_tx,
62 })),
63 }
64 }
65}
66
67#[async_trait]
68impl Supervisable for MetadataCollectorWorker {
69 fn name(&self) -> &str {
70 self.name
71 }
72
73 async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
74 let state = Arc::clone(&self.state);
75
76 Ok(Box::pin(async move {
77 let mut state_guard = state.lock_owned().await;
78
79 select! {
80 _ = process_shutdown => Ok(()),
81 result = run_collector(&mut state_guard) => result,
82 }
83 }))
84 }
85}
86
87async fn run_collector(state: &mut MetadataCollectorState) -> Result<(), GenericError> {
88 debug!(
89 collector_name = state.collector.name(),
90 "Starting metadata collector worker."
91 );
92
93 let MetadataCollectorState {
94 collector,
95 operations_tx,
96 } = state;
97
98 let result = collector.watch(operations_tx).await;
99 if let Err(e) = &result {
100 warn!(
101 error = %e,
102 collector_name = collector.name(),
103 "Failed to collect metadata. Sleeping 2s before retrying...",
104 );
105 sleep(Duration::from_secs(2)).await;
106 }
107
108 debug!(collector_name = collector.name(), "Metadata collector worker stopped.");
109
110 result
111}