Skip to main content

saluki_env/workload/
aggregator.rs

1//! Workload metadata aggregation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::sync::shutdown::ShutdownHandle;
8use saluki_core::health::Health;
9use saluki_core::runtime::{InitializationError, Supervisable, SupervisorFuture};
10use saluki_error::{generic_error, GenericError};
11use tokio::{select, sync::mpsc, sync::Mutex};
12use tracing::debug;
13
14use super::metadata::MetadataOperation;
15
16// TODO: Make this configurable.
17const OPERATIONS_CHANNEL_SIZE: usize = 128;
18
19/// Metadata aggregator based on configurable collectors.
20///
21/// Metadata collectors are used to either scrape or listen for changes in workload metadata, which converts those
22/// changes into [`MetadataOperation`]s that are applied to a [`MetadataStore`]. `MetadataAggregator` receives those
23/// operations and applies them to all configured [`MetadataStore`]s.
24pub struct MetadataAggregator {
25    state: Arc<Mutex<MetadataAggregatorState>>,
26}
27
28struct MetadataAggregatorState {
29    stores: Vec<Box<dyn MetadataStore + Send>>,
30    operations_rx: mpsc::Receiver<MetadataOperation>,
31    health: Health,
32}
33
34impl MetadataAggregator {
35    /// Creates a new `MetadataAggregator`, returning it along with the operations sender that should be cloned into
36    /// each collector worker.
37    pub fn new(health: Health) -> (Self, mpsc::Sender<MetadataOperation>) {
38        let (operations_tx, operations_rx) = mpsc::channel(OPERATIONS_CHANNEL_SIZE);
39        let state = MetadataAggregatorState {
40            stores: Vec::new(),
41            operations_rx,
42            health,
43        };
44        (
45            Self {
46                state: Arc::new(Mutex::new(state)),
47            },
48            operations_tx,
49        )
50    }
51
52    /// Adds a metadata store to the aggregator.
53    ///
54    /// This store will receive a copy of every metadata operation that's emitted from the configured metadata
55    /// collectors.
56    pub fn add_store<S>(&mut self, store: S)
57    where
58        S: MetadataStore + Send + 'static,
59    {
60        // Setup-time access: nothing else holds the lock yet, so this never contends.
61        let mut state = self
62            .state
63            .try_lock()
64            .expect("aggregator state should not be locked during setup");
65        state.stores.push(Box::new(store));
66    }
67}
68
69impl MemoryBounds for MetadataAggregator {
70    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
71        builder
72            .firm()
73            // Operations channel.
74            .with_array::<MetadataOperation>("metadata ops channel", OPERATIONS_CHANNEL_SIZE);
75
76        let state = self
77            .state
78            .try_lock()
79            .expect("aggregator state should not be locked during bounds calculation");
80        for store in &state.stores {
81            builder.with_subcomponent(store.name(), store);
82        }
83    }
84}
85
86#[async_trait]
87impl Supervisable for MetadataAggregator {
88    fn name(&self) -> &str {
89        "workload-aggregator"
90    }
91
92    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
93        let state = Arc::clone(&self.state);
94
95        Ok(Box::pin(async move {
96            let mut state_guard = state.lock_owned().await;
97
98            select! {
99                _ = process_shutdown => Ok(()),
100                result = run_aggregator(&mut state_guard) => result,
101            }
102        }))
103    }
104}
105
106async fn run_aggregator(state: &mut MetadataAggregatorState) -> Result<(), GenericError> {
107    debug!("Metadata aggregator started.");
108    state.health.mark_ready();
109
110    loop {
111        select! {
112            _ = state.health.live() => {},
113            maybe_operation = state.operations_rx.recv() => match maybe_operation {
114                Some(operation) => {
115                    // Send the operation to all stores, taking care to only clone the operation if we have two
116                    // or more stores configured.
117                    let stores_to_clone_for = state.stores.len().saturating_sub(1);
118                    for store in state.stores.iter_mut().take(stores_to_clone_for) {
119                        store.process_operation(operation.clone());
120                    }
121
122                    if let Some(last_store) = state.stores.last_mut() {
123                        last_store.process_operation(operation);
124                    }
125                },
126                None => {
127                    debug!("Metadata aggregator operations channel closed. Stopping...");
128                    break;
129                },
130            },
131        }
132    }
133
134    state.health.mark_not_ready();
135    debug!("Metadata aggregator stopped.");
136
137    Err(generic_error!(
138        "Metadata aggregator operation channel closed unexpectedly."
139    ))
140}
141
142/// A store which receives a stream of metadata operations.
143pub trait MetadataStore: MemoryBounds {
144    /// Returns the name of the store.
145    fn name(&self) -> &'static str;
146
147    /// Process a metadata operation.
148    fn process_operation(&mut self, operation: MetadataOperation);
149}