saluki_components/transforms/host_enrichment/
mod.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_core::{components::transforms::*, topology::EventsBuffer};
6use saluki_core::{components::ComponentContext, data_model::event::metric::Metric};
7use saluki_env::{EnvironmentProvider, HostProvider};
8use saluki_error::GenericError;
9
10pub struct HostEnrichmentConfiguration<E> {
16 env_provider: E,
17}
18
19impl<E> HostEnrichmentConfiguration<E> {
20 pub fn from_environment_provider(env_provider: E) -> Self {
22 Self { env_provider }
23 }
24}
25
26#[async_trait]
27impl<E> SynchronousTransformBuilder for HostEnrichmentConfiguration<E>
28where
29 E: EnvironmentProvider + Send + Sync + 'static,
30 <E::Host as HostProvider>::Error: Into<GenericError>,
31{
32 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
33 Ok(Box::new(
34 HostEnrichment::from_environment_provider(&self.env_provider).await?,
35 ))
36 }
37}
38
39impl<E> MemoryBounds for HostEnrichmentConfiguration<E> {
40 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
41 builder
49 .minimum()
50 .with_single_value::<HostEnrichment>("component struct");
51 }
52}
53
54pub struct HostEnrichment {
55 hostname: Arc<str>,
56}
57
58impl HostEnrichment {
59 pub async fn from_environment_provider<E>(env_provider: &E) -> Result<Self, GenericError>
60 where
61 E: EnvironmentProvider + Send + Sync + 'static,
62 <E::Host as HostProvider>::Error: Into<GenericError>,
63 {
64 Ok(Self {
65 hostname: env_provider
66 .host()
67 .get_hostname()
68 .await
69 .map(Arc::from)
70 .map_err(Into::into)?,
71 })
72 }
73
74 fn enrich_metric(&self, metric: &mut Metric) {
75 if metric.metadata().hostname().is_none() {
77 metric.metadata_mut().set_hostname(self.hostname.clone());
78 }
79 }
80}
81
82impl SynchronousTransform for HostEnrichment {
83 fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
84 for event in event_buffer {
85 if let Some(metric) = event.try_as_metric_mut() {
86 self.enrich_metric(metric)
87 }
88 }
89 }
90}