Skip to main content

saluki_env/workload/collectors/
mod.rs

1//! Workload metadata collection.
2
3use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use saluki_common::sync::shutdown::ShutdownHandle;
7use saluki_core::runtime::{InitializationError, Supervisable, SupervisorFuture};
8use saluki_error::GenericError;
9use tokio::{
10    select,
11    sync::{mpsc, Mutex},
12    time::sleep,
13};
14use tracing::{debug, warn};
15
16use super::metadata::MetadataOperation;
17
18#[cfg(target_os = "linux")]
19mod cgroups;
20#[cfg(target_os = "linux")]
21pub use self::cgroups::CgroupsMetadataCollector;
22
23#[cfg(unix)]
24mod containerd;
25#[cfg(unix)]
26pub use self::containerd::ContainerdMetadataCollector;
27
28/// A metadata collector.
29///
30/// Metadata collectors are responsible for collecting metadata from the environment, both at startup and over time as
31/// changes to the workload occur. This metadata can represent many things, from basic key/value pairs about specific
32/// entities, to fixed relationships between entities, to more dynamic information like the current state of a workload.
33#[async_trait]
34pub trait MetadataCollector {
35    /// Get the name of this collector.
36    fn name(&self) -> &'static str;
37
38    /// Watch for metadata changes.
39    async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError>;
40}
41
42/// A worker that drives a [`MetadataCollector`] and forwards metadata operations to a central aggregator.
43pub struct MetadataCollectorWorker {
44    name: &'static str,
45    state: Arc<Mutex<MetadataCollectorState>>,
46}
47
48struct MetadataCollectorState {
49    collector: Box<dyn MetadataCollector + Send>,
50    operations_tx: mpsc::Sender<MetadataOperation>,
51}
52
53impl MetadataCollectorWorker {
54    /// Create a new `MetadataCollectorWorker` from the given `collector` and operations sender.
55    pub fn new<MC>(collector: MC, operations_tx: mpsc::Sender<MetadataOperation>) -> Self
56    where
57        MC: MetadataCollector + Send + 'static,
58    {
59        let name = collector.name();
60        Self {
61            name,
62            state: Arc::new(Mutex::new(MetadataCollectorState {
63                collector: Box::new(collector),
64                operations_tx,
65            })),
66        }
67    }
68}
69
70#[async_trait]
71impl Supervisable for MetadataCollectorWorker {
72    fn name(&self) -> &str {
73        self.name
74    }
75
76    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
77        let state = Arc::clone(&self.state);
78
79        Ok(Box::pin(async move {
80            let mut state_guard = state.lock_owned().await;
81
82            select! {
83                _ = process_shutdown => Ok(()),
84                result = run_collector(&mut state_guard) => result,
85            }
86        }))
87    }
88}
89
90async fn run_collector(state: &mut MetadataCollectorState) -> Result<(), GenericError> {
91    debug!(
92        collector_name = state.collector.name(),
93        "Starting metadata collector worker."
94    );
95
96    let MetadataCollectorState {
97        collector,
98        operations_tx,
99    } = state;
100
101    let result = collector.watch(operations_tx).await;
102    if let Err(e) = &result {
103        warn!(
104            error = %e,
105            collector_name = collector.name(),
106            "Failed to collect metadata. Sleeping 2s before retrying...",
107        );
108        sleep(Duration::from_secs(2)).await;
109    }
110
111    debug!(collector_name = collector.name(), "Metadata collector worker stopped.");
112
113    result
114}