Skip to main content

saluki_components/transforms/host_enrichment/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_core::{components::transforms::*, topology::EventsBuffer};
6use saluki_core::{
7    components::ComponentContext,
8    data_model::event::{eventd::EventD, metric::Metric, service_check::ServiceCheck},
9};
10use saluki_env::{EnvironmentProvider, HostProvider};
11use saluki_error::GenericError;
12
13/// Host Enrichment synchronous transform.
14///
15/// Enriches metrics with a hostname if one is not already present. Calculates the hostname to use based on the
16/// configured environment provider, allowing for a high degree of accuracy around what qualifies as a hostname, and how
17/// to query it.
18pub struct HostEnrichmentConfiguration<E> {
19    env_provider: E,
20}
21
22impl<E> HostEnrichmentConfiguration<E> {
23    /// Creates a new `HostEnrichmentConfiguration` with the given environment provider.
24    pub fn from_environment_provider(env_provider: E) -> Self {
25        Self { env_provider }
26    }
27}
28
29#[async_trait]
30impl<E> SynchronousTransformBuilder for HostEnrichmentConfiguration<E>
31where
32    E: EnvironmentProvider + Send + Sync + 'static,
33    <E::Host as HostProvider>::Error: Into<GenericError>,
34{
35    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
36        Ok(Box::new(
37            HostEnrichment::from_environment_provider(&self.env_provider).await?,
38        ))
39    }
40}
41
42impl<E> MemoryBounds for HostEnrichmentConfiguration<E> {
43    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
44        // TODO: We don't account for the size of the hostname since we only query it when we go to actually build the
45        // transform. We could move the querying to the point where we create `HostEnrichmentConfiguration` itself but
46        // that would mean it couldn't be updated dynamically.
47        //
48        // Not a relevant problem _right now_, but a _potential_ problem in the future. :shrug:
49
50        // Capture the size of the heap allocation when the component is built.
51        builder
52            .minimum()
53            .with_single_value::<HostEnrichment>("component struct");
54    }
55}
56
57pub struct HostEnrichment {
58    hostname: Arc<str>,
59}
60
61impl HostEnrichment {
62    pub async fn from_environment_provider<E>(env_provider: &E) -> Result<Self, GenericError>
63    where
64        E: EnvironmentProvider + Send + Sync + 'static,
65        <E::Host as HostProvider>::Error: Into<GenericError>,
66    {
67        Ok(Self {
68            hostname: env_provider
69                .host()
70                .get_hostname()
71                .await
72                .map(Arc::from)
73                .map_err(Into::into)?,
74        })
75    }
76
77    fn enrich_metric(&self, metric: &mut Metric) {
78        // Only add the hostname if it's not already present.
79        if metric.metadata().hostname().is_none() {
80            metric.metadata_mut().set_hostname(self.hostname.clone());
81        }
82    }
83
84    fn enrich_eventd(&self, eventd: &mut EventD) {
85        // Only add the hostname if it's not already present.
86        if eventd.hostname().is_none() {
87            eventd.set_hostname(Some(self.hostname.as_ref().into()));
88        }
89    }
90
91    fn enrich_service_check(&self, service_check: &mut ServiceCheck) {
92        // Only add the hostname if it's not already present.
93        if service_check.hostname().is_none() {
94            service_check.set_hostname(Some(self.hostname.as_ref().into()));
95        }
96    }
97}
98
99impl SynchronousTransform for HostEnrichment {
100    fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
101        for event in event_buffer {
102            if let Some(metric) = event.try_as_metric_mut() {
103                self.enrich_metric(metric);
104            } else if let Some(eventd) = event.try_as_eventd_mut() {
105                self.enrich_eventd(eventd);
106            } else if let Some(service_check) = event.try_as_service_check_mut() {
107                self.enrich_service_check(service_check);
108            }
109        }
110    }
111}