Skip to main content

saluki_env/workload/collectors/
mod.rs

1//! Workload metadata collection.
2
3use std::{sync::Arc, time::Duration};
4
5use async_trait::async_trait;
6use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture};
7use saluki_error::GenericError;
8use tokio::{
9    select,
10    sync::{mpsc, Mutex},
11    time::sleep,
12};
13use tracing::{debug, warn};
14
15use super::metadata::MetadataOperation;
16
17#[cfg(target_os = "linux")]
18mod cgroups;
19#[cfg(target_os = "linux")]
20pub use self::cgroups::CgroupsMetadataCollector;
21
22mod containerd;
23pub use self::containerd::ContainerdMetadataCollector;
24
25/// A metadata collector.
26///
27/// Metadata collectors are responsible for collecting metadata from the environment, both at startup and over time as
28/// changes to the workload occur. This metadata can represent many things, from basic key/value pairs about specific
29/// entities, to fixed relationships between entities, to more dynamic information like the current state of a workload.
30#[async_trait]
31pub trait MetadataCollector {
32    /// Get the name of this collector.
33    fn name(&self) -> &'static str;
34
35    /// Watch for metadata changes.
36    async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError>;
37}
38
39/// A worker that drives a [`MetadataCollector`] and forwards metadata operations to a central aggregator.
40pub struct MetadataCollectorWorker {
41    name: &'static str,
42    state: Arc<Mutex<MetadataCollectorState>>,
43}
44
45struct MetadataCollectorState {
46    collector: Box<dyn MetadataCollector + Send>,
47    operations_tx: mpsc::Sender<MetadataOperation>,
48}
49
50impl MetadataCollectorWorker {
51    /// Create a new `MetadataCollectorWorker` from the given `collector` and operations sender.
52    pub fn new<MC>(collector: MC, operations_tx: mpsc::Sender<MetadataOperation>) -> Self
53    where
54        MC: MetadataCollector + Send + 'static,
55    {
56        let name = collector.name();
57        Self {
58            name,
59            state: Arc::new(Mutex::new(MetadataCollectorState {
60                collector: Box::new(collector),
61                operations_tx,
62            })),
63        }
64    }
65}
66
67#[async_trait]
68impl Supervisable for MetadataCollectorWorker {
69    fn name(&self) -> &str {
70        self.name
71    }
72
73    async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
74        let state = Arc::clone(&self.state);
75
76        Ok(Box::pin(async move {
77            let mut state_guard = state.lock_owned().await;
78
79            select! {
80                _ = process_shutdown => Ok(()),
81                result = run_collector(&mut state_guard) => result,
82            }
83        }))
84    }
85}
86
87async fn run_collector(state: &mut MetadataCollectorState) -> Result<(), GenericError> {
88    debug!(
89        collector_name = state.collector.name(),
90        "Starting metadata collector worker."
91    );
92
93    let MetadataCollectorState {
94        collector,
95        operations_tx,
96    } = state;
97
98    let result = collector.watch(operations_tx).await;
99    if let Err(e) = &result {
100        warn!(
101            error = %e,
102            collector_name = collector.name(),
103            "Failed to collect metadata. Sleeping 2s before retrying...",
104        );
105        sleep(Duration::from_secs(2)).await;
106    }
107
108    debug!(collector_name = collector.name(), "Metadata collector worker stopped.");
109
110    result
111}