saluki_context/
origin.rs

1#![allow(warnings)]
2//! Metric origin.
3
4use std::{fmt, num::NonZeroU32, sync::Arc};
5
6use indexmap::Equivalent;
7use saluki_common::hash::hash_single_fast;
8use serde::{Deserialize, Serialize};
9use stringtheory::MetaString;
10use tracing::warn;
11
12use crate::tags::{SharedTagSet, Tag};
13
14/// The cardinality of tags associated with the origin entity.
15#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq)]
16#[serde(try_from = "String")]
17pub enum OriginTagCardinality {
18    /// No cardinality.
19    ///
20    /// This implies that no tags should be added to the metric based on its origin.
21    None,
22
23    /// Low cardinality.
24    ///
25    /// This generally covers tags which are static, or relatively slow to change, and generally results in a small
26    /// number of unique values for the given tag key.
27    Low,
28
29    /// Orchestrator cardinality.
30    ///
31    /// This generally covers orchestrator-specific tags, such as Kubernetes pod UID, and lands somewhere between low
32    /// and high cardinality.
33    Orchestrator,
34
35    /// High cardinality.
36    ///
37    /// This generally covers tags which frequently change and generally results in a large number of unique values for
38    /// the given tag key.
39    High,
40}
41
42impl OriginTagCardinality {
43    /// Returns the string representation of the cardinality.
44    pub const fn as_str(&self) -> &'static str {
45        match self {
46            Self::None => "none",
47            Self::Low => "low",
48            Self::Orchestrator => "orchestrator",
49            Self::High => "high",
50        }
51    }
52}
53
54impl TryFrom<&str> for OriginTagCardinality {
55    type Error = String;
56
57    fn try_from(value: &str) -> Result<Self, Self::Error> {
58        match value {
59            "none" => Ok(Self::None),
60            "low" => Ok(Self::Low),
61            "high" => Ok(Self::High),
62            "orch" | "orchestrator" => Ok(Self::Orchestrator),
63            other => Err(format!("unknown tag cardinality type '{}'", other)),
64        }
65    }
66}
67
68impl TryFrom<String> for OriginTagCardinality {
69    type Error = String;
70
71    fn try_from(value: String) -> Result<Self, Self::Error> {
72        Self::try_from(value.as_str())
73    }
74}
75
76impl fmt::Display for OriginTagCardinality {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        match self {
79            Self::None => write!(f, "none"),
80            Self::Low => write!(f, "low"),
81            Self::Orchestrator => write!(f, "orchestrator"),
82            Self::High => write!(f, "high"),
83        }
84    }
85}
86
87/// A raw representation of an origin.
88///
89/// Metrics contain metadata about their origin, in terms of the metric's _reason_ for existing: the metric was ingested
90/// via DogStatsD, or was generated by an integration, and so on. However, there is also the concept of a metric
91/// originating from a particular _entity_, such as a specific Kubernetes container. This relates directly to the
92/// specific sender of the metric, which is used to enrich the metric with additional tags describing the origin entity.
93///
94/// The origin entity will generally be the process ID of the metric sender, or the container ID, both of which are then
95/// generally mapped to the relevant information for the metric, such as the orchestrator-level tags for the
96/// container/pod/deployment.
97#[derive(Clone, Debug, Default, Eq, Hash, PartialEq)]
98pub struct RawOrigin<'a> {
99    /// Process ID of the sender.
100    process_id: Option<NonZeroU32>,
101
102    /// Container ID of the sender.
103    ///
104    /// This will generally be the typical long hexadecimal string that is used by container runtimes like `containerd`,
105    /// but may sometimes also be a different form, such as the container's cgroups inode.
106    container_id: Option<&'a str>,
107
108    /// Pod UID of the sender.
109    ///
110    /// This is generally only used in Kubernetes environments to uniquely identify the pod. UIDs are equivalent to UUIDs.
111    pod_uid: Option<&'a str>,
112
113    /// Desired cardinality of any tags associated with the entity.
114    ///
115    /// This controls the cardinality of the tags added to this metric when enriching based on the available entity IDs.
116    cardinality: Option<OriginTagCardinality>,
117
118    /// External Data of the sender.
119    ///
120    /// See [`ExternalData`] for more information.
121    external_data: Option<RawExternalData<'a>>,
122}
123
124impl<'a> RawOrigin<'a> {
125    /// Returns `true` if the origin information is empty.
126    pub fn is_empty(&self) -> bool {
127        self.process_id.is_none()
128            && self.container_id.is_none()
129            && self.pod_uid.is_none()
130            && self.cardinality.is_none()
131            && self.external_data.is_none()
132    }
133
134    /// Unsets the process ID of the sender.
135    pub fn clear_process_id(&mut self) {
136        self.process_id = None;
137    }
138
139    /// Sets the process ID of the sender.
140    ///
141    /// Must be a non-zero value. If the value is zero, it is silently ignored.
142    pub fn set_process_id(&mut self, process_id: u32) {
143        self.process_id = NonZeroU32::new(process_id);
144    }
145
146    /// Returns the process ID of the sender.
147    pub fn process_id(&self) -> Option<u32> {
148        self.process_id.map(NonZeroU32::get)
149    }
150
151    /// Sets the container ID of the sender.
152    pub fn set_container_id(&mut self, container_id: impl Into<Option<&'a str>>) {
153        self.container_id = container_id.into();
154    }
155
156    /// Returns the container ID of the sender.
157    pub fn container_id(&self) -> Option<&str> {
158        self.container_id
159    }
160
161    /// Sets the pod UID of the sender.
162    pub fn set_pod_uid(&mut self, pod_uid: impl Into<Option<&'a str>>) {
163        self.pod_uid = pod_uid.into();
164    }
165
166    /// Returns the pod UID of the sender.
167    pub fn pod_uid(&self) -> Option<&str> {
168        self.pod_uid
169    }
170
171    /// Sets the desired cardinality of any tags associated with the entity.
172    pub fn set_cardinality(&mut self, cardinality: impl Into<Option<OriginTagCardinality>>) {
173        self.cardinality = cardinality.into();
174    }
175
176    /// Returns the desired cardinality of any tags associated with the entity.
177    pub fn cardinality(&self) -> Option<OriginTagCardinality> {
178        self.cardinality.as_ref().copied()
179    }
180
181    /// Sets the external data of the sender.
182    pub fn set_external_data(&mut self, external_data: impl Into<Option<&'a str>>) {
183        self.external_data = external_data.into().and_then(RawExternalData::try_from_str);
184    }
185
186    /// Returns the external data of the sender.
187    pub fn external_data(&self) -> Option<&RawExternalData<'a>> {
188        self.external_data.as_ref()
189    }
190}
191
192impl fmt::Display for RawOrigin<'_> {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        let mut has_written = false;
195
196        write!(f, "RawOrigin(")?;
197
198        if let Some(process_id) = self.process_id {
199            write!(f, "process_id={}", process_id)?;
200        }
201
202        if let Some(container_id) = self.container_id {
203            if has_written {
204                write!(f, " ")?;
205            } else {
206                has_written = true;
207            }
208            write!(f, "container_id={}", container_id)?;
209        }
210
211        if let Some(pod_uid) = self.pod_uid {
212            if has_written {
213                write!(f, " ")?;
214            } else {
215                has_written = true;
216            }
217            write!(f, "pod_uid={}", pod_uid)?;
218        }
219
220        if let Some(cardinality) = self.cardinality {
221            if has_written {
222                write!(f, " ")?;
223            } else {
224                has_written = true;
225            }
226            write!(f, "cardinality={}", cardinality)?;
227        }
228
229        if let Some(external_data) = self.external_data.as_ref() {
230            if has_written {
231                write!(f, " ")?;
232            }
233            write!(f, "external_data={}", external_data)?;
234        }
235
236        write!(f, ")")
237    }
238}
239
240/// A resolver for mapping origins to their associated tags.
241pub trait OriginTagsResolver: Send + Sync {
242    /// Resolves the origin tags for the given raw origin.
243    fn resolve_origin_tags(&self, origin: RawOrigin<'_>) -> SharedTagSet;
244}
245
246impl<T> OriginTagsResolver for Arc<T>
247where
248    T: OriginTagsResolver,
249{
250    fn resolve_origin_tags(&self, origin: RawOrigin<'_>) -> SharedTagSet {
251        (**self).resolve_origin_tags(origin)
252    }
253}
254
255impl<T> OriginTagsResolver for Option<T>
256where
257    T: OriginTagsResolver,
258{
259    fn resolve_origin_tags(&self, origin: RawOrigin<'_>) -> SharedTagSet {
260        self.as_ref()
261            .map(|resolver| resolver.resolve_origin_tags(origin))
262            .unwrap_or_else(|| SharedTagSet::default())
263    }
264}
265
266/// External Data associated with an origin.
267///
268/// "External Data" is a concept that is used to aid origin detection of workloads running in Kubernetes environments
269/// where introspection is not possible or may return incorrect information. Origin detection generally centers around
270/// determining the container where a metric originates from, and then enriching the metric with tags that describe that
271/// container, as well as the pod the container is running within, and so on. In some cases, the origin of a metric
272/// cannot be detected from the outside (such as by using peer credentials over Unix Domain sockets) and cannot be
273/// detected by the workload itself (such as when running in nested virtualization environments). In these cases, we
274/// need a mechanism that allows passing the necessary information to the client, who then passes it on to us, so that
275/// we can correctly resolve the origin.
276///
277/// "External Data" supports this by allowing for an external Kubernetes admission controller to attach specific
278/// metadata -- pod UID and container name -- to application pods, which is then read and sent along with metrics. This
279/// information is then used during origin detection in order to correlate the container ID of the origin, which is
280/// sufficient to allow enriching the metric with container-specific tags.
281///
282/// # Format
283///
284/// An External Data string is a comma-separated list of key/value pairs, where each key represents a particular aspect
285/// of the workload entity. The following keys are supported:
286///
287/// - `it-<true/false>`: A boolean value indicating whether the entity is an init container.
288/// - `pu-<pod_uid>`: The pod UID associated with the entity.
289/// - `cn-<container_name>`: The container name associated with the entity.
290///
291/// For parsing external data strings without allocating, see [`RawExternalData`].
292#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
293pub struct ExternalData {
294    pod_uid: MetaString,
295    container_name: MetaString,
296    init_container: bool,
297}
298
299impl ExternalData {
300    /// Creates a new `ExternalData` instance.
301    pub fn new(pod_uid: MetaString, container_name: MetaString, init_container: bool) -> Self {
302        Self {
303            pod_uid,
304            container_name,
305            init_container,
306        }
307    }
308
309    /// Returns the pod UID.
310    pub fn pod_uid(&self) -> &MetaString {
311        &self.pod_uid
312    }
313
314    /// Returns the container name.
315    pub fn container_name(&self) -> &MetaString {
316        &self.container_name
317    }
318
319    /// Returns `true` if the entity is an init container.
320    pub fn is_init_container(&self) -> bool {
321        self.init_container
322    }
323}
324
325impl std::hash::Hash for ExternalData {
326    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
327        (*self.pod_uid).hash(state);
328        (*self.container_name).hash(state);
329        self.init_container.hash(state);
330    }
331}
332
333/// A borrowed representation of [`ExternalData`].
334///
335/// This can be used to parse external data strings without needing to allocate backing storage for any of the fields,
336/// and can be used to look up map entries (such as when using `HashMap`) when the key is [`ExternalData`].
337#[derive(Clone, Debug, Eq, PartialEq)]
338pub struct RawExternalData<'a> {
339    pod_uid: &'a str,
340    container_name: &'a str,
341    init_container: bool,
342}
343
344impl<'a> RawExternalData<'a> {
345    /// Creates a new `RawExternalData` from a raw string.
346    ///
347    /// If the external data is not valid, `None` is returned.
348    pub fn try_from_str(raw: &'a str) -> Option<Self> {
349        if raw.is_empty() {
350            return None;
351        }
352
353        let mut data = Self {
354            pod_uid: "",
355            container_name: "",
356            init_container: false,
357        };
358
359        let parts = raw.split(',');
360        for part in parts {
361            if part.len() < 4 {
362                // All key/value pairs have a prefix of `xx-` where `xx` is some short code, so we basically can't have
363                // any real key/value pair that's less than four characters overall.
364                warn!("Parsed external data with invalid key/value pair: {}", part);
365                continue;
366            }
367
368            let key = &part[0..3];
369            let value = &part[3..];
370
371            match key {
372                "it-" => data.init_container = value.parse().unwrap_or(false),
373                "pu-" => data.pod_uid = value,
374                "cn-" => data.container_name = value,
375                _ => {
376                    // Unknown key, ignore.
377                    warn!("Parsed external data with unknown key: {}", key);
378                }
379            }
380        }
381
382        Some(data)
383    }
384}
385
386impl std::hash::Hash for RawExternalData<'_> {
387    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
388        self.pod_uid.hash(state);
389        self.container_name.hash(state);
390        self.init_container.hash(state);
391    }
392}
393
394impl fmt::Display for RawExternalData<'_> {
395    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396        write!(
397            f,
398            "pu-{},cn-{},it-{}",
399            self.pod_uid, self.container_name, self.init_container
400        )
401    }
402}
403
404impl Equivalent<ExternalData> for RawExternalData<'_> {
405    fn equivalent(&self, other: &ExternalData) -> bool {
406        self.pod_uid == &*other.pod_uid
407            && self.container_name == &*other.container_name
408            && self.init_container == other.init_container
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use std::{
415        collections::hash_map::DefaultHasher,
416        hash::{Hash as _, Hasher as _},
417    };
418
419    use proptest::prelude::*;
420
421    use super::*;
422
423    proptest! {
424        #[test]
425        fn property_test_identical_hash_impls(pod_uid in "[a-z0-9]{1,64}", container_name in "[a-z0-9]{1,64}", init_container in any::<bool>()) {
426            let external_data = ExternalData::new(pod_uid.clone().into(), container_name.clone().into(), init_container);
427            let external_data_ref = RawExternalData {
428                pod_uid: &pod_uid,
429                container_name: &container_name,
430                init_container,
431            };
432
433            let mut hasher = DefaultHasher::new();
434            external_data.hash(&mut hasher);
435            let external_data_hash = hasher.finish();
436
437            let mut hasher = DefaultHasher::new();
438            external_data_ref.hash(&mut hasher);
439            let external_data_ref_hash = hasher.finish();
440
441            assert_eq!(external_data_hash, external_data_ref_hash);
442        }
443    }
444}