saluki_env/workload/stores/
external_data.rs1use std::{num::NonZeroUsize, sync::Arc};
2
3use arc_swap::ArcSwap;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::collections::{FastHashSet, FastIndexMap};
6use saluki_context::origin::{ExternalData, RawExternalData};
7use saluki_metrics::static_metrics;
8use tracing::{debug, trace};
9
10use crate::workload::{
11 aggregator::MetadataStore, origin::ResolvedExternalData, EntityId, MetadataAction, MetadataOperation,
12};
13
14static_metrics!(
15 name => Telemetry,
16 prefix => external_data_store,
17 metrics => [
18 gauge(entity_limit),
19 gauge(active_entities),
20 counter(ops_delete_total),
21 counter(ops_attach_external_data_total),
22 ],
23);
24
25pub struct ExternalDataStore {
43 snapshot: Arc<ArcSwap<ExternalDataSnapshot>>,
44 entity_limit: NonZeroUsize,
45 active_entities: FastHashSet<EntityId>,
46 forward_mappings: FastIndexMap<ExternalData, ResolvedExternalData>,
47 reverse_mappings: FastIndexMap<EntityId, ExternalData>,
48 telemetry: Telemetry,
49}
50
51impl ExternalDataStore {
52 pub fn with_entity_limit(entity_limit: NonZeroUsize) -> Self {
57 let telemetry = Telemetry::new();
58 telemetry.entity_limit().set(entity_limit.get() as f64);
59
60 Self {
61 snapshot: Arc::new(ArcSwap::new(Arc::new(ExternalDataSnapshot::default()))),
62 entity_limit,
63 active_entities: FastHashSet::default(),
64 forward_mappings: FastIndexMap::default(),
65 reverse_mappings: FastIndexMap::default(),
66 telemetry,
67 }
68 }
69
70 pub fn entity_limit(&self) -> usize {
72 self.entity_limit.get()
73 }
74
75 fn track_entity(&mut self, entity_id: &EntityId) -> bool {
76 if self.active_entities.contains(entity_id) {
77 return true;
78 }
79
80 if self.active_entities.len() >= self.entity_limit() {
81 return false;
82 }
83
84 self.telemetry.active_entities().increment(1);
85 let _ = self.active_entities.insert(entity_id.clone());
86 true
87 }
88
89 fn add_mapping(&mut self, external_data: ExternalData, entity_id: EntityId) {
90 if !self.track_entity(&entity_id) {
91 trace!(
92 entity_limit = self.entity_limit(),
93 %entity_id,
94 "Entity limit reached, not adding mapping."
95 );
96 return;
97 }
98
99 let resolved = ResolvedExternalData::new(EntityId::PodUid(external_data.pod_uid().clone()), entity_id.clone());
102
103 let _ = self.forward_mappings.insert(external_data.clone(), resolved);
104 let _ = self.reverse_mappings.insert(entity_id, external_data);
105 }
106
107 fn remove_mapping(&mut self, entity_id: EntityId) {
108 if !self.active_entities.remove(&entity_id) {
109 return;
110 }
111
112 self.telemetry.active_entities().decrement(1);
113
114 if let Some(external_data) = self.reverse_mappings.swap_remove(&entity_id) {
115 let _ = self.forward_mappings.swap_remove(&external_data);
116 }
117 }
118
119 pub fn resolver(&self) -> ExternalDataStoreResolver {
121 ExternalDataStoreResolver {
122 snapshot: Arc::clone(&self.snapshot),
123 }
124 }
125}
126
127impl MetadataStore for ExternalDataStore {
128 fn name(&self) -> &'static str {
129 "external_data"
130 }
131
132 fn process_operation(&mut self, operation: MetadataOperation) {
133 debug!(?operation, "Processing metadata operation.");
134
135 let entity_id = operation.entity_id;
138 for action in operation.actions {
139 match action {
140 MetadataAction::AttachExternalData { external_data } => {
141 self.telemetry.ops_attach_external_data_total().increment(1);
142 self.add_mapping(external_data, entity_id.clone());
143 }
144 MetadataAction::Delete => {
145 self.telemetry.ops_delete_total().increment(1);
146 self.remove_mapping(entity_id.clone());
147 }
148
149 _ => {}
151 }
152 }
153
154 let snapshot = Arc::new(ExternalDataSnapshot {
156 forward_mappings: self.forward_mappings.clone(),
157 });
158
159 self.snapshot.store(snapshot);
160 }
161}
162
163impl MemoryBounds for ExternalDataStore {
164 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
165 builder
166 .firm()
167 .with_array::<EntityId>("entity ids", self.entity_limit())
169 .with_map::<ExternalData, EntityId>("ext data entity map", self.entity_limit())
171 .with_map::<EntityId, ExternalData>("entity ext data map", self.entity_limit());
172 }
173}
174
175#[derive(Default)]
176struct ExternalDataSnapshot {
177 forward_mappings: FastIndexMap<ExternalData, ResolvedExternalData>,
178}
179
180#[derive(Clone)]
182pub struct ExternalDataStoreResolver {
183 snapshot: Arc<ArcSwap<ExternalDataSnapshot>>,
184}
185
186impl ExternalDataStoreResolver {
187 pub fn resolve(&self, external_data: &RawExternalData<'_>) -> Option<ResolvedExternalData> {
194 let snapshot = self.snapshot.load();
195 snapshot.forward_mappings.get(external_data).cloned()
196 }
197
198 pub fn with_latest_snapshot<F>(&self, mut f: F)
200 where
201 F: FnMut(&ExternalData, &EntityId),
202 {
203 let snapshot = self.snapshot.load();
204 for (external_data, resolved_ed) in snapshot.forward_mappings.iter() {
205 f(external_data, resolved_ed.container_entity_id());
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use std::num::NonZeroUsize;
213
214 use saluki_context::origin::{ExternalData, RawExternalData};
215
216 use super::ExternalDataStore;
217 use crate::workload::{aggregator::MetadataStore as _, origin::ResolvedExternalData, EntityId, MetadataOperation};
218
219 const DEFAULT_ENTITY_LIMIT: NonZeroUsize = NonZeroUsize::new(10).unwrap();
220
221 fn entity_id_container(id: &str) -> EntityId {
222 EntityId::Container(id.into())
223 }
224
225 fn build_external_data(
226 pod_uid: &str, container_name: &str, container_id: &EntityId, init_container: bool,
227 ) -> (String, ExternalData, ResolvedExternalData) {
228 let raw_external_data = format!("pu-{},cn-{},it-{}", pod_uid, container_name, init_container);
229 let pod_entity_id = EntityId::from_pod_uid(pod_uid).unwrap();
230
231 let external_data = ExternalData::new(pod_uid.into(), container_name.into(), init_container);
232 let resolved_external_data = ResolvedExternalData::new(pod_entity_id.clone(), container_id.clone());
233 (raw_external_data, external_data, resolved_external_data)
234 }
235
236 #[test]
237 fn basic() {
238 let mut store = ExternalDataStore::with_entity_limit(DEFAULT_ENTITY_LIMIT);
239 let resolver = store.resolver();
240
241 let container_eid = entity_id_container("abcdef");
242 let (raw_ed, ed, resolved_ed) = build_external_data("1234", "redis", &container_eid, false);
243 let ed_ref = RawExternalData::try_from_str(&raw_ed).unwrap();
244
245 assert_eq!(resolver.resolve(&ed_ref), None);
247
248 store.process_operation(MetadataOperation::attach_external_data(container_eid.clone(), ed));
250
251 assert_eq!(resolver.resolve(&ed_ref), Some(resolved_ed));
253
254 store.process_operation(MetadataOperation::delete(container_eid));
256
257 assert_eq!(resolver.resolve(&ed_ref), None);
258 }
259
260 #[test]
261 fn obeys_entity_limit() {
262 let mut store = ExternalDataStore::with_entity_limit(NonZeroUsize::new(2).unwrap());
264 let resolver = store.resolver();
265
266 let container_eid1 = entity_id_container("abcdef");
268 let container_eid2 = entity_id_container("bcdefg");
269 let container_eid3 = entity_id_container("cdefgh");
270 let (raw_ed1, ed1, resolved_ed1) = build_external_data("1234", "redis", &container_eid1, false);
271 let (raw_ed2, ed2, resolved_ed2) = build_external_data("1234", "init-volume", &container_eid2, true);
272 let (raw_ed3, ed3, resolved_ed3) = build_external_data("1234", "chmod-dir", &container_eid3, true);
273 let ed_ref1 = RawExternalData::try_from_str(&raw_ed1).unwrap();
274 let ed_ref2 = RawExternalData::try_from_str(&raw_ed2).unwrap();
275 let ed_ref3 = RawExternalData::try_from_str(&raw_ed3).unwrap();
276
277 assert_eq!(resolver.resolve(&ed_ref1), None);
278 assert_eq!(resolver.resolve(&ed_ref2), None);
279 assert_eq!(resolver.resolve(&ed_ref3), None);
280
281 store.process_operation(MetadataOperation::attach_external_data(container_eid1.clone(), ed1));
283 store.process_operation(MetadataOperation::attach_external_data(container_eid2, ed2));
284 store.process_operation(MetadataOperation::attach_external_data(
285 container_eid3.clone(),
286 ed3.clone(),
287 ));
288
289 assert_eq!(resolver.resolve(&ed_ref1), Some(resolved_ed1));
292 assert_eq!(resolver.resolve(&ed_ref2), Some(resolved_ed2));
293 assert_eq!(resolver.resolve(&ed_ref3), None);
294
295 store.process_operation(MetadataOperation::delete(container_eid1));
297 assert_eq!(resolver.resolve(&ed_ref1), None);
298
299 store.process_operation(MetadataOperation::attach_external_data(container_eid3, ed3));
301 assert_eq!(resolver.resolve(&ed_ref3), Some(resolved_ed3));
302 }
303}