saluki_env/workload/
origin.rs

1//! Origin detection and resolution.
2
3use std::{num::NonZeroUsize, sync::Arc, time::Duration};
4
5use saluki_common::{
6    cache::{Cache, CacheBuilder},
7    hash::hash_single_fast,
8};
9use saluki_context::origin::{OriginTagCardinality, RawOrigin};
10use tracing::trace;
11
12use super::stores::ExternalDataStoreResolver;
13use crate::workload::EntityId;
14
15// SAFETY: This number is obviously non-zero.
16const DEFAULT_ORIGIN_CACHE_ITEM_LIMIT: NonZeroUsize = NonZeroUsize::new(500_000).unwrap();
17const DEFAULT_ORIGIN_CACHE_ITEM_TIME_TO_IDLE: Duration = Duration::from_secs(30);
18
19/// A resolved External Data entry.
20#[derive(Clone, Debug, Eq, Hash, PartialEq)]
21pub struct ResolvedExternalData {
22    pod_entity_id: EntityId,
23    container_entity_id: EntityId,
24}
25
26impl ResolvedExternalData {
27    /// Creates a new `ResolvedExternalData` from the given pod and container entity IDs.
28    pub fn new(pod_entity_id: EntityId, container_entity_id: EntityId) -> Self {
29        Self {
30            pod_entity_id,
31            container_entity_id,
32        }
33    }
34
35    /// Returns a reference to the pod entity ID.
36    pub fn pod_entity_id(&self) -> &EntityId {
37        &self.pod_entity_id
38    }
39
40    /// Returns a reference to the container entity ID.
41    pub fn container_entity_id(&self) -> &EntityId {
42        &self.container_entity_id
43    }
44}
45
46#[derive(Debug, Hash, Eq, PartialEq)]
47struct ResolvedOriginInner {
48    cardinality: Option<OriginTagCardinality>,
49    process_id: Option<EntityId>,
50    container_id: Option<EntityId>,
51    pod_uid: Option<EntityId>,
52    resolved_external_data: Option<ResolvedExternalData>,
53}
54
55/// An resolved representation of `RawOrigin<'a>`
56///
57/// This representation is used to store the pre-calculated entity IDs derived from a borrowed `RawOrigin<'a>` in order
58/// to speed the lookup of origin tags attached to each individual entity ID that comprises an origin.
59///
60/// This type can be cheaply cloned and shared across threads.
61#[derive(Clone, Debug, Hash, Eq, PartialEq)]
62pub struct ResolvedOrigin {
63    inner: Arc<ResolvedOriginInner>,
64}
65
66impl ResolvedOrigin {
67    /// Creates a new `ResolvedOrigin` from the given parts.
68    pub fn from_parts(
69        cardinality: Option<OriginTagCardinality>, process_id: Option<EntityId>, container_id: Option<EntityId>,
70        pod_uid: Option<EntityId>, resolved_external_data: Option<ResolvedExternalData>,
71    ) -> Self {
72        Self {
73            inner: Arc::new(ResolvedOriginInner {
74                cardinality,
75                process_id,
76                container_id,
77                pod_uid,
78                resolved_external_data,
79            }),
80        }
81    }
82
83    /// Returns the cardinality of the origin.
84    pub fn cardinality(&self) -> Option<OriginTagCardinality> {
85        self.inner.cardinality
86    }
87
88    /// Returns the process ID of the origin.
89    pub fn process_id(&self) -> Option<&EntityId> {
90        self.inner.process_id.as_ref()
91    }
92
93    /// Returns the container ID of the origin.
94    pub fn container_id(&self) -> Option<&EntityId> {
95        self.inner.container_id.as_ref()
96    }
97
98    /// Returns the pod UID of the origin.
99    pub fn pod_uid(&self) -> Option<&EntityId> {
100        self.inner.pod_uid.as_ref()
101    }
102
103    /// Returns the resolved external data of the origin.
104    pub fn resolved_external_data(&self) -> Option<&ResolvedExternalData> {
105        self.inner.resolved_external_data.as_ref()
106    }
107}
108
109/// Resolves and tracks origins.
110#[derive(Clone)]
111pub struct OriginResolver {
112    ed_resolver: ExternalDataStoreResolver,
113    origin_cache: Cache<u64, ResolvedOrigin>,
114}
115
116impl OriginResolver {
117    /// Creates a new `OriginResolver`.
118    pub fn new(ed_resolver: ExternalDataStoreResolver) -> Self {
119        Self {
120            ed_resolver,
121            origin_cache: CacheBuilder::from_identifier("origin_cache")
122                .expect("identifier cannot be invalid")
123                .with_capacity(DEFAULT_ORIGIN_CACHE_ITEM_LIMIT)
124                .with_time_to_idle(Some(DEFAULT_ORIGIN_CACHE_ITEM_TIME_TO_IDLE))
125                .build(),
126        }
127    }
128
129    fn build_resolved_origin(&self, origin: RawOrigin<'_>) -> ResolvedOrigin {
130        ResolvedOrigin::from_parts(
131            origin.cardinality(),
132            origin.process_id().map(EntityId::ContainerPid),
133            origin.container_id().and_then(EntityId::from_raw_container_id),
134            origin.pod_uid().and_then(EntityId::from_pod_uid),
135            origin
136                .external_data()
137                .and_then(|raw_ed| self.ed_resolver.resolve(raw_ed)),
138        )
139    }
140
141    pub(crate) fn get_resolved_origin(&self, origin: RawOrigin<'_>) -> Option<ResolvedOrigin> {
142        // If there's no origin information at all, then there's nothing to key off of.
143        if origin.is_empty() {
144            return None;
145        }
146
147        // Create the origin key, and populate our cache with the resolved origin if we don't already have it.
148        let origin_key = hash_single_fast(&origin);
149        match self.origin_cache.get(&origin_key) {
150            Some(resolved_origin) => {
151                trace!(?origin_key, "Found origin in cache.");
152                Some(resolved_origin)
153            }
154            None => {
155                trace!(?origin_key, "Origin not found in cache. Resolving.");
156                let resolved_origin = self.build_resolved_origin(origin);
157                self.origin_cache.insert(origin_key, resolved_origin.clone());
158
159                Some(resolved_origin)
160            }
161        }
162    }
163}
164
165/*
166#[cfg(test)]
167mod tests {
168    use std::num::NonZeroUsize;
169
170    use stringtheory::MetaString;
171
172    use super::*;
173    use crate::workload::{
174        aggregator::MetadataStore as _,
175        stores::{ExternalDataStore, TagStore},
176        MetadataOperation,
177    };
178
179    const PROCESS_ID_A_RAW: u32 = 1;
180    const PROCESS_ID_B_RAW: u32 = 2;
181    const CONTAINER_ID_A_RAW: &str = "container-a";
182    const CONTAINER_ID_B_RAW: &str = "container-b";
183    const CONTAINER_ID_C_RAW: &str = "container-c";
184    const PROCESS_ID_A: EntityId = EntityId::ContainerPid(PROCESS_ID_A_RAW);
185    const PROCESS_ID_B: EntityId = EntityId::ContainerPid(PROCESS_ID_B_RAW);
186    const CONTAINER_ID_A: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_A_RAW));
187    const CONTAINER_ID_B: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_B_RAW));
188    const CONTAINER_ID_C: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_C_RAW));
189
190    fn create_raw_origin(process_id: u32, container_id: Option<&'static str>) -> RawOrigin<'static> {
191        let mut raw_origin = RawOrigin::default();
192        raw_origin.set_process_id(process_id);
193        raw_origin.set_container_id(container_id);
194        raw_origin
195    }
196
197    #[track_caller]
198    fn create_origin_resolver<const N: usize>(aliases: [(EntityId, EntityId); N]) -> OriginResolver {
199        // Create our tag store and seed it with any provided aliases.
200        let mut tag_store = TagStore::with_entity_limit(NonZeroUsize::new(usize::MAX).unwrap());
201        let tag_store_querier = tag_store.querier();
202
203        for (entity_id, alias) in aliases {
204            tag_store.process_operation(MetadataOperation::add_alias(entity_id.clone(), alias.clone()));
205            assert_eq!(tag_store_querier.get_entity_alias(&entity_id), Some(alias));
206        }
207
208        let external_data_store = ExternalDataStore::with_entity_limit(NonZeroUsize::new(usize::MAX).unwrap());
209
210        OriginResolver::new(external_data_store.resolver())
211    }
212
213    #[test]
214    fn resolve_origin_no_aliases_different_process_id_no_container_id() {
215        // Create our origin resolver with no aliases pre-loaded, so we're just resolving the raw origins based on only
216        // the data they contain.
217        let origin_resolver = create_origin_resolver([]);
218
219        // Assert that the two resulting resolved origins are equal.
220        //
221        // While the raw origins should be different (different process IDs, no container ID), the resolved origins
222        // should end up with no container ID, as the raw origins don't have one and no aliases were present, which
223        // should resulting in both origins being the same due to effectively being empty.
224        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, None);
225        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, None);
226        assert_ne!(raw_origin_a, raw_origin_b);
227
228        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
229        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
230        assert_eq!(origin_key_a, origin_key_b);
231
232        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
233        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
234        assert_eq!(resolved_origin_a, resolved_origin_b);
235        assert_eq!(resolved_origin_a.container_id(), None);
236        assert_eq!(resolved_origin_b.container_id(), None);
237    }
238
239    #[test]
240    fn resolve_origin_no_aliases_different_process_id_different_container_id() {
241        // Create our origin resolver with no aliases pre-loaded, so we're just resolving the raw origins based on only
242        // the data they contain.
243        let origin_resolver = create_origin_resolver([]);
244
245        // Assert that the two resulting resolved origins are not equal.
246        //
247        // The raw origins should be different (different process IDs, different container IDs), and the resolved
248        // origins should be different, given that even after resolving the process ID, the resulting origins should
249        // have different container IDs.
250        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
251        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_B_RAW));
252        assert_ne!(raw_origin_a, raw_origin_b);
253
254        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
255        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
256        assert_ne!(origin_key_a, origin_key_b);
257
258        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
259        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
260        assert_ne!(resolved_origin_a, resolved_origin_b);
261        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
262        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
263    }
264
265    #[test]
266    fn resolve_origin_same_alias_different_process_ids_no_container_id() {
267        // Create our original resolver with two aliases pre-loaded: process ID A to container ID B, and process ID B to
268        // container ID B.
269        let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_B), (PROCESS_ID_B, CONTAINER_ID_B)]);
270
271        // Assert that the two resulting resolved origins are equal.
272        //
273        // While the raw origins should be different (different process IDs, no container ID), the resolved origins
274        // should use the aliased container ID for each process ID, which is the same for both origins.
275        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, None);
276        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, None);
277        assert_ne!(raw_origin_a, raw_origin_b);
278
279        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
280        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
281        assert_eq!(origin_key_a, origin_key_b);
282
283        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
284        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
285        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_B));
286        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
287    }
288
289    #[test]
290    fn resolve_origin_same_alias_different_process_ids_same_container_id() {
291        // Create our origin resolver with two aliases pre-loaded: process ID A to container ID B, and process ID B to
292        // container B.
293        let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_B), (PROCESS_ID_B, CONTAINER_ID_B)]);
294
295        // Assert that the two resulting resolved origins are equal.
296        //
297        // While the raw origins should be different (different process IDs, same container ID), the resolved origins
298        // should ignore the process IDs and their aliases and use the provided container ID, which is the same for both
299        // origins.
300        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
301        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_A_RAW));
302        assert_ne!(raw_origin_a, raw_origin_b);
303
304        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
305        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
306        assert_eq!(origin_key_a, origin_key_b);
307
308        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
309        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
310        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
311        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_A));
312    }
313
314    #[test]
315    fn resolve_origin_same_alias_different_process_ids_different_container_id() {
316        // Create our origin resolver with two aliases pre-loaded: process ID A to container ID C, and process ID B to
317        // container C.
318        let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_C), (PROCESS_ID_B, CONTAINER_ID_C)]);
319
320        // Assert that the two resulting resolved origins are not equal.
321        //
322        // The raw origins should be different (different process IDs, different container IDs), and the resolved
323        // origins should be different, given that the process ID resolution should not affect the explicitly provided
324        // container IDs, which are different.
325        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
326        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_B_RAW));
327        assert_ne!(raw_origin_a, raw_origin_b);
328
329        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
330        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
331        assert_ne!(origin_key_a, origin_key_b);
332
333        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
334        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
335        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
336        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
337    }
338}
339*/