Skip to main content

saluki_env/workload/
origin.rs

1//! Origin detection and resolution.
2
3use std::{num::NonZeroUsize, sync::Arc, time::Duration};
4
5use saluki_common::{
6    cache::{Cache, CacheBuilder},
7    hash::hash_single_fast,
8};
9use saluki_context::origin::{OriginTagCardinality, RawOrigin};
10use tracing::trace;
11
12use super::stores::ExternalDataStoreResolver;
13use crate::workload::EntityId;
14
15// SAFETY: This number is obviously non-zero.
16const DEFAULT_ORIGIN_CACHE_ITEM_LIMIT: NonZeroUsize = NonZeroUsize::new(500_000).unwrap();
17const DEFAULT_ORIGIN_CACHE_ITEM_TIME_TO_IDLE: Duration = Duration::from_secs(30);
18
19/// A resolved External Data entry.
20#[derive(Clone, Debug, Eq, Hash, PartialEq)]
21pub struct ResolvedExternalData {
22    pod_entity_id: EntityId,
23    container_entity_id: EntityId,
24}
25
26impl ResolvedExternalData {
27    /// Creates a new `ResolvedExternalData` from the given pod and container entity IDs.
28    pub fn new(pod_entity_id: EntityId, container_entity_id: EntityId) -> Self {
29        Self {
30            pod_entity_id,
31            container_entity_id,
32        }
33    }
34
35    /// Returns a reference to the pod entity ID.
36    pub fn pod_entity_id(&self) -> &EntityId {
37        &self.pod_entity_id
38    }
39
40    /// Returns a reference to the container entity ID.
41    pub fn container_entity_id(&self) -> &EntityId {
42        &self.container_entity_id
43    }
44}
45
46#[derive(Debug, Hash, Eq, PartialEq)]
47struct ResolvedOriginInner {
48    cardinality: Option<OriginTagCardinality>,
49    process_id: Option<EntityId>,
50    local_data: Option<EntityId>,
51    pod_uid: Option<EntityId>,
52    resolved_external_data: Option<ResolvedExternalData>,
53}
54
55/// An resolved representation of `RawOrigin<'a>`
56///
57/// This representation is used to store the pre-calculated entity IDs derived from a borrowed `RawOrigin<'a>` in order
58/// to speed the lookup of origin tags attached to each individual entity ID that comprises an origin.
59///
60/// This type can be cheaply cloned and shared across threads.
61#[derive(Clone, Debug, Hash, Eq, PartialEq)]
62pub struct ResolvedOrigin {
63    inner: Arc<ResolvedOriginInner>,
64}
65
66impl ResolvedOrigin {
67    /// Creates a new `ResolvedOrigin` from the given parts.
68    pub fn from_parts(
69        cardinality: Option<OriginTagCardinality>, process_id: Option<EntityId>, local_data: Option<EntityId>,
70        pod_uid: Option<EntityId>, resolved_external_data: Option<ResolvedExternalData>,
71    ) -> Self {
72        Self {
73            inner: Arc::new(ResolvedOriginInner {
74                cardinality,
75                process_id,
76                local_data,
77                pod_uid,
78                resolved_external_data,
79            }),
80        }
81    }
82
83    /// Returns the cardinality of the origin.
84    pub fn cardinality(&self) -> Option<OriginTagCardinality> {
85        self.inner.cardinality
86    }
87
88    /// Returns the process ID of the origin.
89    pub fn process_id(&self) -> Option<&EntityId> {
90        self.inner.process_id.as_ref()
91    }
92
93    /// Returns the Local Data-based entity ID of the origin.
94    pub fn local_data(&self) -> Option<&EntityId> {
95        self.inner.local_data.as_ref()
96    }
97
98    /// Returns the pod UID of the origin.
99    pub fn pod_uid(&self) -> Option<&EntityId> {
100        self.inner.pod_uid.as_ref()
101    }
102
103    /// Returns the resolved External Data of the origin.
104    pub fn resolved_external_data(&self) -> Option<&ResolvedExternalData> {
105        self.inner.resolved_external_data.as_ref()
106    }
107}
108
109/// Resolves and tracks origins.
110#[derive(Clone)]
111pub struct OriginResolver {
112    ed_resolver: ExternalDataStoreResolver,
113    origin_cache: Cache<u64, ResolvedOrigin>,
114}
115
116impl OriginResolver {
117    /// Creates a new `OriginResolver`.
118    pub fn new(ed_resolver: ExternalDataStoreResolver) -> Self {
119        Self {
120            ed_resolver,
121            origin_cache: CacheBuilder::from_identifier("origin_cache")
122                .expect("identifier cannot be invalid")
123                .with_capacity(DEFAULT_ORIGIN_CACHE_ITEM_LIMIT)
124                .with_time_to_idle(Some(DEFAULT_ORIGIN_CACHE_ITEM_TIME_TO_IDLE))
125                .build(),
126        }
127    }
128
129    fn build_resolved_origin(&self, origin: RawOrigin<'_>) -> ResolvedOrigin {
130        ResolvedOrigin::from_parts(
131            origin.cardinality(),
132            origin.process_id().map(EntityId::ContainerPid),
133            origin.local_data().and_then(EntityId::from_local_data),
134            origin.pod_uid().and_then(EntityId::from_pod_uid),
135            origin
136                .external_data()
137                .and_then(|raw_ed| self.ed_resolver.resolve(raw_ed)),
138        )
139    }
140
141    /// Returns the resolved origin for the given raw origin.
142    ///
143    /// If the raw origin is "empty" -- no origin information is available -- then `None` is returned.
144    ///
145    /// The resolved origin may be cached for speeding up future lookups.
146    pub fn get_resolved_origin(&self, origin: RawOrigin<'_>) -> Option<ResolvedOrigin> {
147        // If there's no origin information at all, then there's nothing to key off of.
148        if origin.is_empty() {
149            return None;
150        }
151
152        // Create the origin key, and populate our cache with the resolved origin if we don't already have it.
153        let origin_key = hash_single_fast(&origin);
154        match self.origin_cache.get(&origin_key) {
155            Some(resolved_origin) => {
156                trace!(?origin_key, "Found origin in cache.");
157                Some(resolved_origin)
158            }
159            None => {
160                trace!(?origin_key, "Origin not found in cache. Resolving.");
161                let resolved_origin = self.build_resolved_origin(origin);
162                self.origin_cache.insert(origin_key, resolved_origin.clone());
163
164                Some(resolved_origin)
165            }
166        }
167    }
168}
169
170/*
171#[cfg(test)]
172mod tests {
173    use std::num::NonZeroUsize;
174
175    use stringtheory::MetaString;
176
177    use super::*;
178    use crate::workload::{
179        aggregator::MetadataStore as _,
180        stores::{ExternalDataStore, TagStore},
181        MetadataOperation,
182    };
183
184    const PROCESS_ID_A_RAW: u32 = 1;
185    const PROCESS_ID_B_RAW: u32 = 2;
186    const CONTAINER_ID_A_RAW: &str = "container-a";
187    const CONTAINER_ID_B_RAW: &str = "container-b";
188    const CONTAINER_ID_C_RAW: &str = "container-c";
189    const PROCESS_ID_A: EntityId = EntityId::ContainerPid(PROCESS_ID_A_RAW);
190    const PROCESS_ID_B: EntityId = EntityId::ContainerPid(PROCESS_ID_B_RAW);
191    const CONTAINER_ID_A: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_A_RAW));
192    const CONTAINER_ID_B: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_B_RAW));
193    const CONTAINER_ID_C: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_C_RAW));
194
195    fn create_raw_origin(process_id: u32, container_id: Option<&'static str>) -> RawOrigin<'static> {
196        let mut raw_origin = RawOrigin::default();
197        raw_origin.set_process_id(process_id);
198        raw_origin.set_container_id(container_id);
199        raw_origin
200    }
201
202    #[track_caller]
203    fn create_origin_resolver<const N: usize>(aliases: [(EntityId, EntityId); N]) -> OriginResolver {
204        // Create our tag store and seed it with any provided aliases.
205        let mut tag_store = TagStore::with_entity_limit(NonZeroUsize::new(usize::MAX).unwrap());
206        let tag_store_querier = tag_store.querier();
207
208        for (entity_id, alias) in aliases {
209            tag_store.process_operation(MetadataOperation::add_alias(entity_id.clone(), alias.clone()));
210            assert_eq!(tag_store_querier.get_entity_alias(&entity_id), Some(alias));
211        }
212
213        let external_data_store = ExternalDataStore::with_entity_limit(NonZeroUsize::new(usize::MAX).unwrap());
214
215        OriginResolver::new(external_data_store.resolver())
216    }
217
218    #[test]
219    fn resolve_origin_no_aliases_different_process_id_no_container_id() {
220        // Create our origin resolver with no aliases pre-loaded, so we're just resolving the raw origins based on only
221        // the data they contain.
222        let origin_resolver = create_origin_resolver([]);
223
224        // Assert that the two resulting resolved origins are equal.
225        //
226        // While the raw origins should be different (different process IDs, no container ID), the resolved origins
227        // should end up with no container ID, as the raw origins don't have one and no aliases were present, which
228        // should resulting in both origins being the same due to effectively being empty.
229        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, None);
230        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, None);
231        assert_ne!(raw_origin_a, raw_origin_b);
232
233        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
234        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
235        assert_eq!(origin_key_a, origin_key_b);
236
237        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
238        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
239        assert_eq!(resolved_origin_a, resolved_origin_b);
240        assert_eq!(resolved_origin_a.container_id(), None);
241        assert_eq!(resolved_origin_b.container_id(), None);
242    }
243
244    #[test]
245    fn resolve_origin_no_aliases_different_process_id_different_container_id() {
246        // Create our origin resolver with no aliases pre-loaded, so we're just resolving the raw origins based on only
247        // the data they contain.
248        let origin_resolver = create_origin_resolver([]);
249
250        // Assert that the two resulting resolved origins are not equal.
251        //
252        // The raw origins should be different (different process IDs, different container IDs), and the resolved
253        // origins should be different, given that even after resolving the process ID, the resulting origins should
254        // have different container IDs.
255        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
256        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_B_RAW));
257        assert_ne!(raw_origin_a, raw_origin_b);
258
259        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
260        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
261        assert_ne!(origin_key_a, origin_key_b);
262
263        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
264        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
265        assert_ne!(resolved_origin_a, resolved_origin_b);
266        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
267        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
268    }
269
270    #[test]
271    fn resolve_origin_same_alias_different_process_ids_no_container_id() {
272        // Create our original resolver with two aliases pre-loaded: process ID A to container ID B, and process ID B to
273        // container ID B.
274        let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_B), (PROCESS_ID_B, CONTAINER_ID_B)]);
275
276        // Assert that the two resulting resolved origins are equal.
277        //
278        // While the raw origins should be different (different process IDs, no container ID), the resolved origins
279        // should use the aliased container ID for each process ID, which is the same for both origins.
280        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, None);
281        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, None);
282        assert_ne!(raw_origin_a, raw_origin_b);
283
284        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
285        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
286        assert_eq!(origin_key_a, origin_key_b);
287
288        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
289        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
290        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_B));
291        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
292    }
293
294    #[test]
295    fn resolve_origin_same_alias_different_process_ids_same_container_id() {
296        // Create our origin resolver with two aliases pre-loaded: process ID A to container ID B, and process ID B to
297        // container B.
298        let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_B), (PROCESS_ID_B, CONTAINER_ID_B)]);
299
300        // Assert that the two resulting resolved origins are equal.
301        //
302        // While the raw origins should be different (different process IDs, same container ID), the resolved origins
303        // should ignore the process IDs and their aliases and use the provided container ID, which is the same for both
304        // origins.
305        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
306        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_A_RAW));
307        assert_ne!(raw_origin_a, raw_origin_b);
308
309        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
310        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
311        assert_eq!(origin_key_a, origin_key_b);
312
313        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
314        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
315        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
316        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_A));
317    }
318
319    #[test]
320    fn resolve_origin_same_alias_different_process_ids_different_container_id() {
321        // Create our origin resolver with two aliases pre-loaded: process ID A to container ID C, and process ID B to
322        // container C.
323        let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_C), (PROCESS_ID_B, CONTAINER_ID_C)]);
324
325        // Assert that the two resulting resolved origins are not equal.
326        //
327        // The raw origins should be different (different process IDs, different container IDs), and the resolved
328        // origins should be different, given that the process ID resolution should not affect the explicitly provided
329        // container IDs, which are different.
330        let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
331        let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_B_RAW));
332        assert_ne!(raw_origin_a, raw_origin_b);
333
334        let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
335        let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
336        assert_ne!(origin_key_a, origin_key_b);
337
338        let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
339        let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
340        assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
341        assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
342    }
343}
344*/