saluki_env/workload/
aggregator.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_common::sync::shutdown::ShutdownHandle;
8use saluki_core::health::Health;
9use saluki_core::runtime::{InitializationError, Supervisable, SupervisorFuture};
10use saluki_error::{generic_error, GenericError};
11use tokio::{select, sync::mpsc, sync::Mutex};
12use tracing::debug;
13
14use super::metadata::MetadataOperation;
15
16const OPERATIONS_CHANNEL_SIZE: usize = 128;
18
19pub struct MetadataAggregator {
25 state: Arc<Mutex<MetadataAggregatorState>>,
26}
27
28struct MetadataAggregatorState {
29 stores: Vec<Box<dyn MetadataStore + Send>>,
30 operations_rx: mpsc::Receiver<MetadataOperation>,
31 health: Health,
32}
33
34impl MetadataAggregator {
35 pub fn new(health: Health) -> (Self, mpsc::Sender<MetadataOperation>) {
38 let (operations_tx, operations_rx) = mpsc::channel(OPERATIONS_CHANNEL_SIZE);
39 let state = MetadataAggregatorState {
40 stores: Vec::new(),
41 operations_rx,
42 health,
43 };
44 (
45 Self {
46 state: Arc::new(Mutex::new(state)),
47 },
48 operations_tx,
49 )
50 }
51
52 pub fn add_store<S>(&mut self, store: S)
57 where
58 S: MetadataStore + Send + 'static,
59 {
60 let mut state = self
62 .state
63 .try_lock()
64 .expect("aggregator state should not be locked during setup");
65 state.stores.push(Box::new(store));
66 }
67}
68
69impl MemoryBounds for MetadataAggregator {
70 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
71 builder
72 .firm()
73 .with_array::<MetadataOperation>("metadata ops channel", OPERATIONS_CHANNEL_SIZE);
75
76 let state = self
77 .state
78 .try_lock()
79 .expect("aggregator state should not be locked during bounds calculation");
80 for store in &state.stores {
81 builder.with_subcomponent(store.name(), store);
82 }
83 }
84}
85
86#[async_trait]
87impl Supervisable for MetadataAggregator {
88 fn name(&self) -> &str {
89 "workload-aggregator"
90 }
91
92 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
93 let state = Arc::clone(&self.state);
94
95 Ok(Box::pin(async move {
96 let mut state_guard = state.lock_owned().await;
97
98 select! {
99 _ = process_shutdown => Ok(()),
100 result = run_aggregator(&mut state_guard) => result,
101 }
102 }))
103 }
104}
105
106async fn run_aggregator(state: &mut MetadataAggregatorState) -> Result<(), GenericError> {
107 debug!("Metadata aggregator started.");
108 state.health.mark_ready();
109
110 loop {
111 select! {
112 _ = state.health.live() => {},
113 maybe_operation = state.operations_rx.recv() => match maybe_operation {
114 Some(operation) => {
115 let stores_to_clone_for = state.stores.len().saturating_sub(1);
118 for store in state.stores.iter_mut().take(stores_to_clone_for) {
119 store.process_operation(operation.clone());
120 }
121
122 if let Some(last_store) = state.stores.last_mut() {
123 last_store.process_operation(operation);
124 }
125 },
126 None => {
127 debug!("Metadata aggregator operations channel closed. Stopping...");
128 break;
129 },
130 },
131 }
132 }
133
134 state.health.mark_not_ready();
135 debug!("Metadata aggregator stopped.");
136
137 Err(generic_error!(
138 "Metadata aggregator operation channel closed unexpectedly."
139 ))
140}
141
142pub trait MetadataStore: MemoryBounds {
144 fn name(&self) -> &'static str;
146
147 fn process_operation(&mut self, operation: MetadataOperation);
149}