Skip to main content

saluki_env/workload/
aggregator.rs

1//! Workload metadata aggregation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_core::health::Health;
8use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture};
9use saluki_error::{generic_error, GenericError};
10use tokio::{select, sync::mpsc, sync::Mutex};
11use tracing::debug;
12
13use super::metadata::MetadataOperation;
14
15// TODO: Make this configurable.
16const OPERATIONS_CHANNEL_SIZE: usize = 128;
17
18/// Metadata aggregator based on configurable collectors.
19///
20/// Metadata collectors are used to either scrape or listen for changes in workload metadata, which converts those
21/// changes into [`MetadataOperation`]s that are applied to a [`MetadataStore`]. `MetadataAggregator` receives those
22/// operations and applies them to all configured [`MetadataStore`]s.
23pub struct MetadataAggregator {
24    state: Arc<Mutex<MetadataAggregatorState>>,
25}
26
27struct MetadataAggregatorState {
28    stores: Vec<Box<dyn MetadataStore + Send>>,
29    operations_rx: mpsc::Receiver<MetadataOperation>,
30    health: Health,
31}
32
33impl MetadataAggregator {
34    /// Creates a new `MetadataAggregator`, returning it along with the operations sender that should be cloned into
35    /// each collector worker.
36    pub fn new(health: Health) -> (Self, mpsc::Sender<MetadataOperation>) {
37        let (operations_tx, operations_rx) = mpsc::channel(OPERATIONS_CHANNEL_SIZE);
38        let state = MetadataAggregatorState {
39            stores: Vec::new(),
40            operations_rx,
41            health,
42        };
43        (
44            Self {
45                state: Arc::new(Mutex::new(state)),
46            },
47            operations_tx,
48        )
49    }
50
51    /// Adds a metadata store to the aggregator.
52    ///
53    /// This store will receive a copy of every metadata operation that's emitted from the configured metadata
54    /// collectors.
55    pub fn add_store<S>(&mut self, store: S)
56    where
57        S: MetadataStore + Send + 'static,
58    {
59        // Setup-time access: nothing else holds the lock yet, so this never contends.
60        let mut state = self
61            .state
62            .try_lock()
63            .expect("aggregator state should not be locked during setup");
64        state.stores.push(Box::new(store));
65    }
66}
67
68impl MemoryBounds for MetadataAggregator {
69    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
70        builder
71            .firm()
72            // Operations channel.
73            .with_array::<MetadataOperation>("metadata ops channel", OPERATIONS_CHANNEL_SIZE);
74
75        let state = self
76            .state
77            .try_lock()
78            .expect("aggregator state should not be locked during bounds calculation");
79        for store in &state.stores {
80            builder.with_subcomponent(store.name(), store);
81        }
82    }
83}
84
85#[async_trait]
86impl Supervisable for MetadataAggregator {
87    fn name(&self) -> &str {
88        "workload-aggregator"
89    }
90
91    async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
92        let state = Arc::clone(&self.state);
93
94        Ok(Box::pin(async move {
95            let mut state_guard = state.lock_owned().await;
96
97            select! {
98                _ = process_shutdown => Ok(()),
99                result = run_aggregator(&mut state_guard) => result,
100            }
101        }))
102    }
103}
104
105async fn run_aggregator(state: &mut MetadataAggregatorState) -> Result<(), GenericError> {
106    debug!("Metadata aggregator started.");
107    state.health.mark_ready();
108
109    loop {
110        select! {
111            _ = state.health.live() => {},
112            maybe_operation = state.operations_rx.recv() => match maybe_operation {
113                Some(operation) => {
114                    // Send the operation to all stores, taking care to only clone the operation if we have two
115                    // or more stores configured.
116                    let stores_to_clone_for = state.stores.len().saturating_sub(1);
117                    for store in state.stores.iter_mut().take(stores_to_clone_for) {
118                        store.process_operation(operation.clone());
119                    }
120
121                    if let Some(last_store) = state.stores.last_mut() {
122                        last_store.process_operation(operation);
123                    }
124                },
125                None => {
126                    debug!("Metadata aggregator operations channel closed. Stopping...");
127                    break;
128                },
129            },
130        }
131    }
132
133    state.health.mark_not_ready();
134    debug!("Metadata aggregator stopped.");
135
136    Err(generic_error!(
137        "Metadata aggregator operation channel closed unexpectedly."
138    ))
139}
140
141/// A store which receives a stream of metadata operations.
142pub trait MetadataStore: MemoryBounds {
143    /// Returns the name of the store.
144    fn name(&self) -> &'static str;
145
146    /// Process a metadata operation.
147    fn process_operation(&mut self, operation: MetadataOperation);
148}