Skip to main content

saluki_env/workload/stores/
external_data.rs

1use std::{num::NonZeroUsize, sync::Arc};
2
3use arc_swap::ArcSwap;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::collections::{FastHashSet, FastIndexMap};
6use saluki_context::origin::{ExternalData, RawExternalData};
7use saluki_metrics::static_metrics;
8use tracing::{debug, trace};
9
10use crate::workload::{
11    aggregator::MetadataStore, origin::ResolvedExternalData, EntityId, MetadataAction, MetadataOperation,
12};
13
14static_metrics!(
15    name => Telemetry,
16    prefix => external_data_store,
17    metrics => [
18        gauge(entity_limit),
19        gauge(active_entities),
20        counter(ops_delete_total),
21        counter(ops_attach_external_data_total),
22   ],
23);
24
25/// A store for External Data entity mappings.
26///
27/// "External Data" is a concept that's used to aid origin detection of workloads running in Kubernetes environments
28/// where introspection isn't possible or may return incorrect information. Origin detection generally centers around
29/// determining the container where a metric originates from, and then enriching the metric with tags that describe that
30/// container, as well as the pod the container is running within, and so on. In some cases, the origin of a metric
31/// can't be detected from the outside (such as by using peer credentials over Unix Domain sockets) and can't be
32/// detected by the workload itself (such as when running in nested virtualization environments). In these cases, we
33/// need a mechanism to attach metadata to the workload such that it can send the necessary information to allow for the
34/// origin of a metric to be correctly detected.
35///
36/// "External Data" supports this by allowing for an external Kubernetes admission controller to attach specific
37/// metadata -- pod UID and container name -- to application pods, which is then read and sent along with metrics. This
38/// information is then used during origin detection in order to correlate the container ID of the origin, which is
39/// sufficient to allow enriching the metric with container-specific tags.
40///
41/// See [`ExternalData`] for more information on the External Data format itself.
42pub struct ExternalDataStore {
43    snapshot: Arc<ArcSwap<ExternalDataSnapshot>>,
44    entity_limit: NonZeroUsize,
45    active_entities: FastHashSet<EntityId>,
46    forward_mappings: FastIndexMap<ExternalData, ResolvedExternalData>,
47    reverse_mappings: FastIndexMap<EntityId, ExternalData>,
48    telemetry: Telemetry,
49}
50
51impl ExternalDataStore {
52    /// Creates a new `ExternalDataStore` with the given entity limit.
53    ///
54    /// The entity limit is the maximum number of unique entities that can be stored. Once the limit is reached, new
55    /// entities won't be added to the store.
56    pub fn with_entity_limit(entity_limit: NonZeroUsize) -> Self {
57        let telemetry = Telemetry::new();
58        telemetry.entity_limit().set(entity_limit.get() as f64);
59
60        Self {
61            snapshot: Arc::new(ArcSwap::new(Arc::new(ExternalDataSnapshot::default()))),
62            entity_limit,
63            active_entities: FastHashSet::default(),
64            forward_mappings: FastIndexMap::default(),
65            reverse_mappings: FastIndexMap::default(),
66            telemetry,
67        }
68    }
69
70    /// Returns the maximum number of unique entities that can be tracked by the store at any given time.
71    pub fn entity_limit(&self) -> usize {
72        self.entity_limit.get()
73    }
74
75    fn track_entity(&mut self, entity_id: &EntityId) -> bool {
76        if self.active_entities.contains(entity_id) {
77            return true;
78        }
79
80        if self.active_entities.len() >= self.entity_limit() {
81            return false;
82        }
83
84        self.telemetry.active_entities().increment(1);
85        let _ = self.active_entities.insert(entity_id.clone());
86        true
87    }
88
89    fn add_mapping(&mut self, external_data: ExternalData, entity_id: EntityId) {
90        if !self.track_entity(&entity_id) {
91            trace!(
92                entity_limit = self.entity_limit(),
93                %entity_id,
94                "Entity limit reached, not adding mapping."
95            );
96            return;
97        }
98
99        // We create a "resolved" form of the External Data, which includes entity IDs for both the pod and the
100        // container that this External Data is attached to.
101        let resolved = ResolvedExternalData::new(EntityId::PodUid(external_data.pod_uid().clone()), entity_id.clone());
102
103        let _ = self.forward_mappings.insert(external_data.clone(), resolved);
104        let _ = self.reverse_mappings.insert(entity_id, external_data);
105    }
106
107    fn remove_mapping(&mut self, entity_id: EntityId) {
108        if !self.active_entities.remove(&entity_id) {
109            return;
110        }
111
112        self.telemetry.active_entities().decrement(1);
113
114        if let Some(external_data) = self.reverse_mappings.swap_remove(&entity_id) {
115            let _ = self.forward_mappings.swap_remove(&external_data);
116        }
117    }
118
119    /// Returns a `ExternalDataStoreResolver` that can be used to concurrently resolve entity IDs from External Data.
120    pub fn resolver(&self) -> ExternalDataStoreResolver {
121        ExternalDataStoreResolver {
122            snapshot: Arc::clone(&self.snapshot),
123        }
124    }
125}
126
127impl MetadataStore for ExternalDataStore {
128    fn name(&self) -> &'static str {
129        "external_data"
130    }
131
132    fn process_operation(&mut self, operation: MetadataOperation) {
133        debug!(?operation, "Processing metadata operation.");
134
135        // TODO: Maybe come up with a better pattern for doing "only clone for the first N-1 actions, don't clone for the
136        // Nth" since we're needlessly cloning a lot with this current approach.
137        let entity_id = operation.entity_id;
138        for action in operation.actions {
139            match action {
140                MetadataAction::AttachExternalData { external_data } => {
141                    self.telemetry.ops_attach_external_data_total().increment(1);
142                    self.add_mapping(external_data, entity_id.clone());
143                }
144                MetadataAction::Delete => {
145                    self.telemetry.ops_delete_total().increment(1);
146                    self.remove_mapping(entity_id.clone());
147                }
148
149                // We only care about external data, and knowing when to clean up mappings.
150                _ => {}
151            }
152        }
153
154        // Update the snapshot.
155        let snapshot = Arc::new(ExternalDataSnapshot {
156            forward_mappings: self.forward_mappings.clone(),
157        });
158
159        self.snapshot.store(snapshot);
160    }
161}
162
163impl MemoryBounds for ExternalDataStore {
164    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
165        builder
166            .firm()
167            // Active entities.
168            .with_array::<EntityId>("entity ids", self.entity_limit())
169            // Forward and reverse mappings.
170            .with_map::<ExternalData, EntityId>("ext data entity map", self.entity_limit())
171            .with_map::<EntityId, ExternalData>("entity ext data map", self.entity_limit());
172    }
173}
174
175#[derive(Default)]
176struct ExternalDataSnapshot {
177    forward_mappings: FastIndexMap<ExternalData, ResolvedExternalData>,
178}
179
180/// A handle for resolving entity IDs from an `ExternalDataStore`.
181#[derive(Clone)]
182pub struct ExternalDataStoreResolver {
183    snapshot: Arc<ArcSwap<ExternalDataSnapshot>>,
184}
185
186impl ExternalDataStoreResolver {
187    /// Resolves the given raw external data.
188    ///
189    /// The given raw external data is parsed and lookups are performed to resolve the underlying entity IDs (pod and
190    /// container). If the external data maps to valid, and the referenced entities exist, `Some(ResolvedExternalData)`
191    /// is returned, containing the entity IDs for both pod and container. Otherwise, if the raw external data is
192    /// invalid, or the referenced entities don't exist, `None` is returned.
193    pub fn resolve(&self, external_data: &RawExternalData<'_>) -> Option<ResolvedExternalData> {
194        let snapshot = self.snapshot.load();
195        snapshot.forward_mappings.get(external_data).cloned()
196    }
197
198    /// Executes the given function for each forward mapping in the latest snapshot.
199    pub fn with_latest_snapshot<F>(&self, mut f: F)
200    where
201        F: FnMut(&ExternalData, &EntityId),
202    {
203        let snapshot = self.snapshot.load();
204        for (external_data, resolved_ed) in snapshot.forward_mappings.iter() {
205            f(external_data, resolved_ed.container_entity_id());
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use std::num::NonZeroUsize;
213
214    use saluki_context::origin::{ExternalData, RawExternalData};
215
216    use super::ExternalDataStore;
217    use crate::workload::{aggregator::MetadataStore as _, origin::ResolvedExternalData, EntityId, MetadataOperation};
218
219    const DEFAULT_ENTITY_LIMIT: NonZeroUsize = NonZeroUsize::new(10).unwrap();
220
221    fn entity_id_container(id: &str) -> EntityId {
222        EntityId::Container(id.into())
223    }
224
225    fn build_external_data(
226        pod_uid: &str, container_name: &str, container_id: &EntityId, init_container: bool,
227    ) -> (String, ExternalData, ResolvedExternalData) {
228        let raw_external_data = format!("pu-{},cn-{},it-{}", pod_uid, container_name, init_container);
229        let pod_entity_id = EntityId::from_pod_uid(pod_uid).unwrap();
230
231        let external_data = ExternalData::new(pod_uid.into(), container_name.into(), init_container);
232        let resolved_external_data = ResolvedExternalData::new(pod_entity_id.clone(), container_id.clone());
233        (raw_external_data, external_data, resolved_external_data)
234    }
235
236    #[test]
237    fn basic() {
238        let mut store = ExternalDataStore::with_entity_limit(DEFAULT_ENTITY_LIMIT);
239        let resolver = store.resolver();
240
241        let container_eid = entity_id_container("abcdef");
242        let (raw_ed, ed, resolved_ed) = build_external_data("1234", "redis", &container_eid, false);
243        let ed_ref = RawExternalData::try_from_str(&raw_ed).unwrap();
244
245        // Make sure we don't get anything back for this External Data yet:
246        assert_eq!(resolver.resolve(&ed_ref), None);
247
248        // Attach the External Data to the given container:
249        store.process_operation(MetadataOperation::attach_external_data(container_eid.clone(), ed));
250
251        // Now we should be able to resolve the External Data:
252        assert_eq!(resolver.resolve(&ed_ref), Some(resolved_ed));
253
254        // Delete the container entity, which should drop the attached External Data:
255        store.process_operation(MetadataOperation::delete(container_eid));
256
257        assert_eq!(resolver.resolve(&ed_ref), None);
258    }
259
260    #[test]
261    fn obeys_entity_limit() {
262        // Create our `ExternalDataStore` with a reduced entity limit of two:
263        let mut store = ExternalDataStore::with_entity_limit(NonZeroUsize::new(2).unwrap());
264        let resolver = store.resolver();
265
266        // Make sure we don't get anything back for any of this External Data yet:
267        let container_eid1 = entity_id_container("abcdef");
268        let container_eid2 = entity_id_container("bcdefg");
269        let container_eid3 = entity_id_container("cdefgh");
270        let (raw_ed1, ed1, resolved_ed1) = build_external_data("1234", "redis", &container_eid1, false);
271        let (raw_ed2, ed2, resolved_ed2) = build_external_data("1234", "init-volume", &container_eid2, true);
272        let (raw_ed3, ed3, resolved_ed3) = build_external_data("1234", "chmod-dir", &container_eid3, true);
273        let ed_ref1 = RawExternalData::try_from_str(&raw_ed1).unwrap();
274        let ed_ref2 = RawExternalData::try_from_str(&raw_ed2).unwrap();
275        let ed_ref3 = RawExternalData::try_from_str(&raw_ed3).unwrap();
276
277        assert_eq!(resolver.resolve(&ed_ref1), None);
278        assert_eq!(resolver.resolve(&ed_ref2), None);
279        assert_eq!(resolver.resolve(&ed_ref3), None);
280
281        // Attach the External Data to all of the containers:
282        store.process_operation(MetadataOperation::attach_external_data(container_eid1.clone(), ed1));
283        store.process_operation(MetadataOperation::attach_external_data(container_eid2, ed2));
284        store.process_operation(MetadataOperation::attach_external_data(
285            container_eid3.clone(),
286            ed3.clone(),
287        ));
288
289        // Now we should be able to resolve External Data for the first two container entities, but not the third, as we
290        // have hit our entity limit:
291        assert_eq!(resolver.resolve(&ed_ref1), Some(resolved_ed1));
292        assert_eq!(resolver.resolve(&ed_ref2), Some(resolved_ed2));
293        assert_eq!(resolver.resolve(&ed_ref3), None);
294
295        // Delete the first container entity, which should drop the attached External Data:
296        store.process_operation(MetadataOperation::delete(container_eid1));
297        assert_eq!(resolver.resolve(&ed_ref1), None);
298
299        // Try again to attach the External Data to the third container entity, which we should now be able to resolve:
300        store.process_operation(MetadataOperation::attach_external_data(container_eid3, ed3));
301        assert_eq!(resolver.resolve(&ed_ref3), Some(resolved_ed3));
302    }
303}