saluki_env/workload/origin.rs
1//! Origin detection and resolution.
2
3use std::{num::NonZeroUsize, sync::Arc, time::Duration};
4
5use saluki_common::{
6 cache::{Cache, CacheBuilder},
7 hash::hash_single_fast,
8};
9use saluki_context::origin::{OriginTagCardinality, RawOrigin};
10use tracing::trace;
11
12use super::stores::ExternalDataStoreResolver;
13use crate::workload::EntityId;
14
15// SAFETY: This number is obviously non-zero.
16const DEFAULT_ORIGIN_CACHE_ITEM_LIMIT: NonZeroUsize = NonZeroUsize::new(500_000).unwrap();
17const DEFAULT_ORIGIN_CACHE_ITEM_TIME_TO_IDLE: Duration = Duration::from_secs(30);
18
19/// A resolved External Data entry.
20#[derive(Clone, Debug, Eq, Hash, PartialEq)]
21pub struct ResolvedExternalData {
22 pod_entity_id: EntityId,
23 container_entity_id: EntityId,
24}
25
26impl ResolvedExternalData {
27 /// Creates a new `ResolvedExternalData` from the given pod and container entity IDs.
28 pub fn new(pod_entity_id: EntityId, container_entity_id: EntityId) -> Self {
29 Self {
30 pod_entity_id,
31 container_entity_id,
32 }
33 }
34
35 /// Returns a reference to the pod entity ID.
36 pub fn pod_entity_id(&self) -> &EntityId {
37 &self.pod_entity_id
38 }
39
40 /// Returns a reference to the container entity ID.
41 pub fn container_entity_id(&self) -> &EntityId {
42 &self.container_entity_id
43 }
44}
45
46#[derive(Debug, Hash, Eq, PartialEq)]
47struct ResolvedOriginInner {
48 cardinality: Option<OriginTagCardinality>,
49 process_id: Option<EntityId>,
50 container_id: Option<EntityId>,
51 pod_uid: Option<EntityId>,
52 resolved_external_data: Option<ResolvedExternalData>,
53}
54
55/// An resolved representation of `RawOrigin<'a>`
56///
57/// This representation is used to store the pre-calculated entity IDs derived from a borrowed `RawOrigin<'a>` in order
58/// to speed the lookup of origin tags attached to each individual entity ID that comprises an origin.
59///
60/// This type can be cheaply cloned and shared across threads.
61#[derive(Clone, Debug, Hash, Eq, PartialEq)]
62pub struct ResolvedOrigin {
63 inner: Arc<ResolvedOriginInner>,
64}
65
66impl ResolvedOrigin {
67 /// Creates a new `ResolvedOrigin` from the given parts.
68 pub fn from_parts(
69 cardinality: Option<OriginTagCardinality>, process_id: Option<EntityId>, container_id: Option<EntityId>,
70 pod_uid: Option<EntityId>, resolved_external_data: Option<ResolvedExternalData>,
71 ) -> Self {
72 Self {
73 inner: Arc::new(ResolvedOriginInner {
74 cardinality,
75 process_id,
76 container_id,
77 pod_uid,
78 resolved_external_data,
79 }),
80 }
81 }
82
83 /// Returns the cardinality of the origin.
84 pub fn cardinality(&self) -> Option<OriginTagCardinality> {
85 self.inner.cardinality
86 }
87
88 /// Returns the process ID of the origin.
89 pub fn process_id(&self) -> Option<&EntityId> {
90 self.inner.process_id.as_ref()
91 }
92
93 /// Returns the container ID of the origin.
94 pub fn container_id(&self) -> Option<&EntityId> {
95 self.inner.container_id.as_ref()
96 }
97
98 /// Returns the pod UID of the origin.
99 pub fn pod_uid(&self) -> Option<&EntityId> {
100 self.inner.pod_uid.as_ref()
101 }
102
103 /// Returns the resolved external data of the origin.
104 pub fn resolved_external_data(&self) -> Option<&ResolvedExternalData> {
105 self.inner.resolved_external_data.as_ref()
106 }
107}
108
109/// Resolves and tracks origins.
110#[derive(Clone)]
111pub struct OriginResolver {
112 ed_resolver: ExternalDataStoreResolver,
113 origin_cache: Cache<u64, ResolvedOrigin>,
114}
115
116impl OriginResolver {
117 /// Creates a new `OriginResolver`.
118 pub fn new(ed_resolver: ExternalDataStoreResolver) -> Self {
119 Self {
120 ed_resolver,
121 origin_cache: CacheBuilder::from_identifier("origin_cache")
122 .expect("identifier cannot be invalid")
123 .with_capacity(DEFAULT_ORIGIN_CACHE_ITEM_LIMIT)
124 .with_time_to_idle(Some(DEFAULT_ORIGIN_CACHE_ITEM_TIME_TO_IDLE))
125 .build(),
126 }
127 }
128
129 fn build_resolved_origin(&self, origin: RawOrigin<'_>) -> ResolvedOrigin {
130 ResolvedOrigin::from_parts(
131 origin.cardinality(),
132 origin.process_id().map(EntityId::ContainerPid),
133 origin.container_id().and_then(EntityId::from_raw_container_id),
134 origin.pod_uid().and_then(EntityId::from_pod_uid),
135 origin
136 .external_data()
137 .and_then(|raw_ed| self.ed_resolver.resolve(raw_ed)),
138 )
139 }
140
141 pub(crate) fn get_resolved_origin(&self, origin: RawOrigin<'_>) -> Option<ResolvedOrigin> {
142 // If there's no origin information at all, then there's nothing to key off of.
143 if origin.is_empty() {
144 return None;
145 }
146
147 // Create the origin key, and populate our cache with the resolved origin if we don't already have it.
148 let origin_key = hash_single_fast(&origin);
149 match self.origin_cache.get(&origin_key) {
150 Some(resolved_origin) => {
151 trace!(?origin_key, "Found origin in cache.");
152 Some(resolved_origin)
153 }
154 None => {
155 trace!(?origin_key, "Origin not found in cache. Resolving.");
156 let resolved_origin = self.build_resolved_origin(origin);
157 self.origin_cache.insert(origin_key, resolved_origin.clone());
158
159 Some(resolved_origin)
160 }
161 }
162 }
163}
164
165/*
166#[cfg(test)]
167mod tests {
168 use std::num::NonZeroUsize;
169
170 use stringtheory::MetaString;
171
172 use super::*;
173 use crate::workload::{
174 aggregator::MetadataStore as _,
175 stores::{ExternalDataStore, TagStore},
176 MetadataOperation,
177 };
178
179 const PROCESS_ID_A_RAW: u32 = 1;
180 const PROCESS_ID_B_RAW: u32 = 2;
181 const CONTAINER_ID_A_RAW: &str = "container-a";
182 const CONTAINER_ID_B_RAW: &str = "container-b";
183 const CONTAINER_ID_C_RAW: &str = "container-c";
184 const PROCESS_ID_A: EntityId = EntityId::ContainerPid(PROCESS_ID_A_RAW);
185 const PROCESS_ID_B: EntityId = EntityId::ContainerPid(PROCESS_ID_B_RAW);
186 const CONTAINER_ID_A: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_A_RAW));
187 const CONTAINER_ID_B: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_B_RAW));
188 const CONTAINER_ID_C: EntityId = EntityId::Container(MetaString::from_static(CONTAINER_ID_C_RAW));
189
190 fn create_raw_origin(process_id: u32, container_id: Option<&'static str>) -> RawOrigin<'static> {
191 let mut raw_origin = RawOrigin::default();
192 raw_origin.set_process_id(process_id);
193 raw_origin.set_container_id(container_id);
194 raw_origin
195 }
196
197 #[track_caller]
198 fn create_origin_resolver<const N: usize>(aliases: [(EntityId, EntityId); N]) -> OriginResolver {
199 // Create our tag store and seed it with any provided aliases.
200 let mut tag_store = TagStore::with_entity_limit(NonZeroUsize::new(usize::MAX).unwrap());
201 let tag_store_querier = tag_store.querier();
202
203 for (entity_id, alias) in aliases {
204 tag_store.process_operation(MetadataOperation::add_alias(entity_id.clone(), alias.clone()));
205 assert_eq!(tag_store_querier.get_entity_alias(&entity_id), Some(alias));
206 }
207
208 let external_data_store = ExternalDataStore::with_entity_limit(NonZeroUsize::new(usize::MAX).unwrap());
209
210 OriginResolver::new(external_data_store.resolver())
211 }
212
213 #[test]
214 fn resolve_origin_no_aliases_different_process_id_no_container_id() {
215 // Create our origin resolver with no aliases pre-loaded, so we're just resolving the raw origins based on only
216 // the data they contain.
217 let origin_resolver = create_origin_resolver([]);
218
219 // Assert that the two resulting resolved origins are equal.
220 //
221 // While the raw origins should be different (different process IDs, no container ID), the resolved origins
222 // should end up with no container ID, as the raw origins don't have one and no aliases were present, which
223 // should resulting in both origins being the same due to effectively being empty.
224 let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, None);
225 let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, None);
226 assert_ne!(raw_origin_a, raw_origin_b);
227
228 let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
229 let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
230 assert_eq!(origin_key_a, origin_key_b);
231
232 let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
233 let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
234 assert_eq!(resolved_origin_a, resolved_origin_b);
235 assert_eq!(resolved_origin_a.container_id(), None);
236 assert_eq!(resolved_origin_b.container_id(), None);
237 }
238
239 #[test]
240 fn resolve_origin_no_aliases_different_process_id_different_container_id() {
241 // Create our origin resolver with no aliases pre-loaded, so we're just resolving the raw origins based on only
242 // the data they contain.
243 let origin_resolver = create_origin_resolver([]);
244
245 // Assert that the two resulting resolved origins are not equal.
246 //
247 // The raw origins should be different (different process IDs, different container IDs), and the resolved
248 // origins should be different, given that even after resolving the process ID, the resulting origins should
249 // have different container IDs.
250 let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
251 let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_B_RAW));
252 assert_ne!(raw_origin_a, raw_origin_b);
253
254 let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
255 let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
256 assert_ne!(origin_key_a, origin_key_b);
257
258 let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
259 let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
260 assert_ne!(resolved_origin_a, resolved_origin_b);
261 assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
262 assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
263 }
264
265 #[test]
266 fn resolve_origin_same_alias_different_process_ids_no_container_id() {
267 // Create our original resolver with two aliases pre-loaded: process ID A to container ID B, and process ID B to
268 // container ID B.
269 let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_B), (PROCESS_ID_B, CONTAINER_ID_B)]);
270
271 // Assert that the two resulting resolved origins are equal.
272 //
273 // While the raw origins should be different (different process IDs, no container ID), the resolved origins
274 // should use the aliased container ID for each process ID, which is the same for both origins.
275 let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, None);
276 let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, None);
277 assert_ne!(raw_origin_a, raw_origin_b);
278
279 let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
280 let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
281 assert_eq!(origin_key_a, origin_key_b);
282
283 let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
284 let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
285 assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_B));
286 assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
287 }
288
289 #[test]
290 fn resolve_origin_same_alias_different_process_ids_same_container_id() {
291 // Create our origin resolver with two aliases pre-loaded: process ID A to container ID B, and process ID B to
292 // container B.
293 let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_B), (PROCESS_ID_B, CONTAINER_ID_B)]);
294
295 // Assert that the two resulting resolved origins are equal.
296 //
297 // While the raw origins should be different (different process IDs, same container ID), the resolved origins
298 // should ignore the process IDs and their aliases and use the provided container ID, which is the same for both
299 // origins.
300 let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
301 let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_A_RAW));
302 assert_ne!(raw_origin_a, raw_origin_b);
303
304 let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
305 let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
306 assert_eq!(origin_key_a, origin_key_b);
307
308 let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
309 let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
310 assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
311 assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_A));
312 }
313
314 #[test]
315 fn resolve_origin_same_alias_different_process_ids_different_container_id() {
316 // Create our origin resolver with two aliases pre-loaded: process ID A to container ID C, and process ID B to
317 // container C.
318 let origin_resolver = create_origin_resolver([(PROCESS_ID_A, CONTAINER_ID_C), (PROCESS_ID_B, CONTAINER_ID_C)]);
319
320 // Assert that the two resulting resolved origins are not equal.
321 //
322 // The raw origins should be different (different process IDs, different container IDs), and the resolved
323 // origins should be different, given that the process ID resolution should not affect the explicitly provided
324 // container IDs, which are different.
325 let raw_origin_a = create_raw_origin(PROCESS_ID_A_RAW, Some(CONTAINER_ID_A_RAW));
326 let raw_origin_b = create_raw_origin(PROCESS_ID_B_RAW, Some(CONTAINER_ID_B_RAW));
327 assert_ne!(raw_origin_a, raw_origin_b);
328
329 let origin_key_a = origin_resolver.resolve_origin(raw_origin_a).unwrap();
330 let origin_key_b = origin_resolver.resolve_origin(raw_origin_b).unwrap();
331 assert_ne!(origin_key_a, origin_key_b);
332
333 let resolved_origin_a = origin_resolver.get_resolved_origin_by_key(&origin_key_a).unwrap();
334 let resolved_origin_b = origin_resolver.get_resolved_origin_by_key(&origin_key_b).unwrap();
335 assert_eq!(resolved_origin_a.container_id(), Some(&CONTAINER_ID_A));
336 assert_eq!(resolved_origin_b.container_id(), Some(&CONTAINER_ID_B));
337 }
338}
339*/