saluki_env/workload/
aggregator.rs1use std::sync::Arc;
4
5use async_trait::async_trait;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_core::health::Health;
8use saluki_core::runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture};
9use saluki_error::{generic_error, GenericError};
10use tokio::{select, sync::mpsc, sync::Mutex};
11use tracing::debug;
12
13use super::metadata::MetadataOperation;
14
15const OPERATIONS_CHANNEL_SIZE: usize = 128;
17
18pub struct MetadataAggregator {
24 state: Arc<Mutex<MetadataAggregatorState>>,
25}
26
27struct MetadataAggregatorState {
28 stores: Vec<Box<dyn MetadataStore + Send>>,
29 operations_rx: mpsc::Receiver<MetadataOperation>,
30 health: Health,
31}
32
33impl MetadataAggregator {
34 pub fn new(health: Health) -> (Self, mpsc::Sender<MetadataOperation>) {
37 let (operations_tx, operations_rx) = mpsc::channel(OPERATIONS_CHANNEL_SIZE);
38 let state = MetadataAggregatorState {
39 stores: Vec::new(),
40 operations_rx,
41 health,
42 };
43 (
44 Self {
45 state: Arc::new(Mutex::new(state)),
46 },
47 operations_tx,
48 )
49 }
50
51 pub fn add_store<S>(&mut self, store: S)
56 where
57 S: MetadataStore + Send + 'static,
58 {
59 let mut state = self
61 .state
62 .try_lock()
63 .expect("aggregator state should not be locked during setup");
64 state.stores.push(Box::new(store));
65 }
66}
67
68impl MemoryBounds for MetadataAggregator {
69 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
70 builder
71 .firm()
72 .with_array::<MetadataOperation>("metadata ops channel", OPERATIONS_CHANNEL_SIZE);
74
75 let state = self
76 .state
77 .try_lock()
78 .expect("aggregator state should not be locked during bounds calculation");
79 for store in &state.stores {
80 builder.with_subcomponent(store.name(), store);
81 }
82 }
83}
84
85#[async_trait]
86impl Supervisable for MetadataAggregator {
87 fn name(&self) -> &str {
88 "workload-aggregator"
89 }
90
91 async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
92 let state = Arc::clone(&self.state);
93
94 Ok(Box::pin(async move {
95 let mut state_guard = state.lock_owned().await;
96
97 select! {
98 _ = process_shutdown => Ok(()),
99 result = run_aggregator(&mut state_guard) => result,
100 }
101 }))
102 }
103}
104
105async fn run_aggregator(state: &mut MetadataAggregatorState) -> Result<(), GenericError> {
106 debug!("Metadata aggregator started.");
107 state.health.mark_ready();
108
109 loop {
110 select! {
111 _ = state.health.live() => {},
112 maybe_operation = state.operations_rx.recv() => match maybe_operation {
113 Some(operation) => {
114 let stores_to_clone_for = state.stores.len().saturating_sub(1);
117 for store in state.stores.iter_mut().take(stores_to_clone_for) {
118 store.process_operation(operation.clone());
119 }
120
121 if let Some(last_store) = state.stores.last_mut() {
122 last_store.process_operation(operation);
123 }
124 },
125 None => {
126 debug!("Metadata aggregator operations channel closed. Stopping...");
127 break;
128 },
129 },
130 }
131 }
132
133 state.health.mark_not_ready();
134 debug!("Metadata aggregator stopped.");
135
136 Err(generic_error!(
137 "Metadata aggregator operation channel closed unexpectedly."
138 ))
139}
140
141pub trait MetadataStore: MemoryBounds {
143 fn name(&self) -> &'static str;
145
146 fn process_operation(&mut self, operation: MetadataOperation);
148}