saluki_components/transforms/host_enrichment/
mod.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_core::{components::transforms::*, topology::EventsBuffer};
6use saluki_core::{components::ComponentContext, data_model::event::metric::Metric};
7use saluki_env::{EnvironmentProvider, HostProvider};
8use saluki_error::GenericError;
9
10/// Host Enrichment synchronous transform.
11///
12/// Enriches metrics with a hostname if one is not already present. Calculates the hostname to use based on the
13/// configured environment provider, allowing for a high degree of accuracy around what qualifies as a hostname, and how
14/// to query it.
15pub struct HostEnrichmentConfiguration<E> {
16    env_provider: E,
17}
18
19impl<E> HostEnrichmentConfiguration<E> {
20    /// Creates a new `HostEnrichmentConfiguration` with the given environment provider.
21    pub fn from_environment_provider(env_provider: E) -> Self {
22        Self { env_provider }
23    }
24}
25
26#[async_trait]
27impl<E> SynchronousTransformBuilder for HostEnrichmentConfiguration<E>
28where
29    E: EnvironmentProvider + Send + Sync + 'static,
30    <E::Host as HostProvider>::Error: Into<GenericError>,
31{
32    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
33        Ok(Box::new(
34            HostEnrichment::from_environment_provider(&self.env_provider).await?,
35        ))
36    }
37}
38
39impl<E> MemoryBounds for HostEnrichmentConfiguration<E> {
40    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
41        // TODO: We don't account for the size of the hostname since we only query it when we go to actually build the
42        // transform. We could move the querying to the point where we create `HostEnrichmentConfiguration` itself but
43        // that would mean it couldn't be updated dynamically.
44        //
45        // Not a relevant problem _right now_, but a _potential_ problem in the future. :shrug:
46
47        // Capture the size of the heap allocation when the component is built.
48        builder
49            .minimum()
50            .with_single_value::<HostEnrichment>("component struct");
51    }
52}
53
54pub struct HostEnrichment {
55    hostname: Arc<str>,
56}
57
58impl HostEnrichment {
59    pub async fn from_environment_provider<E>(env_provider: &E) -> Result<Self, GenericError>
60    where
61        E: EnvironmentProvider + Send + Sync + 'static,
62        <E::Host as HostProvider>::Error: Into<GenericError>,
63    {
64        Ok(Self {
65            hostname: env_provider
66                .host()
67                .get_hostname()
68                .await
69                .map(Arc::from)
70                .map_err(Into::into)?,
71        })
72    }
73
74    fn enrich_metric(&self, metric: &mut Metric) {
75        // Only add the hostname if it's not already present.
76        if metric.metadata().hostname().is_none() {
77            metric.metadata_mut().set_hostname(self.hostname.clone());
78        }
79    }
80}
81
82impl SynchronousTransform for HostEnrichment {
83    fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
84        for event in event_buffer {
85            if let Some(metric) = event.try_as_metric_mut() {
86                self.enrich_metric(metric)
87            }
88        }
89    }
90}