saluki_components/transforms/metric_router/
mod.rs

1use std::{collections::HashSet, sync::LazyLock};
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_core::{
6    components::{
7        transforms::{Transform, TransformBuilder, TransformContext},
8        ComponentContext,
9    },
10    data_model::event::{Event, EventType},
11    topology::{EventsBuffer, OutputDefinition},
12};
13use saluki_error::{generic_error, GenericError};
14use serde::Deserialize;
15use tokio::select;
16use tracing::{debug, error};
17
18/// Metric router transform configuration.
19#[derive(Deserialize, Clone)]
20pub struct MetricRouterConfiguration {
21    /// List of metric names to match against.
22    pub metric_names: Vec<String>,
23}
24
25/// Routes incoming events based on exact metric name matching to one of two
26/// named outputs: "matched" for metrics with names that exactly match the
27/// configured list, and "unmatched" for all other events. This transform
28/// demonstrates the use of named outputs without a default output.
29#[derive(Debug)]
30pub struct MetricRouter {
31    metric_names: HashSet<String>,
32}
33
34impl MetricRouter {
35    fn new(config: MetricRouterConfiguration) -> Result<Self, GenericError> {
36        for name in &config.metric_names {
37            if name.trim().is_empty() {
38                return Err(generic_error!("Metric names cannot be empty or whitespace-only"));
39            }
40        }
41
42        Ok(Self {
43            metric_names: config.metric_names.into_iter().collect(),
44        })
45    }
46
47    /// Evaluates whether the event should be routed to the matched output.
48    ///
49    /// Returns true if the event is a metric with a name that exactly matches
50    /// one of the configured metric names, false otherwise.
51    fn should_route_to_matched(&self, event: &Event) -> bool {
52        match event {
53            Event::Metric(metric) => {
54                // Check if the metric name exactly matches any of our configured names
55                let metric_name = metric.context().name();
56                self.metric_names.contains(metric_name.as_ref())
57            }
58            _ => {
59                // Non-metric events are always routed to unmatched
60                false
61            }
62        }
63    }
64
65    /// Processes a batch of events by routing them to matched or unmatched outputs.
66    async fn process_event_batch(
67        &self, mut events: EventsBuffer, context: &mut TransformContext,
68    ) -> Result<(), GenericError> {
69        // Extract matched events from the buffer, leaving unmatched ones behind
70        let matched_events = events.extract(|event| self.should_route_to_matched(event));
71
72        let matched_count = context
73            .dispatcher()
74            .buffered_named("matched")?
75            .send_all(matched_events)
76            .await?;
77
78        let unmatched_count = context
79            .dispatcher()
80            .buffered_named("unmatched")?
81            .send_all(events)
82            .await?;
83
84        debug!(
85            matched_events = matched_count,
86            unmatched_events = unmatched_count,
87            "Successfully processed event batch."
88        );
89
90        Ok(())
91    }
92}
93
94#[async_trait]
95impl TransformBuilder for MetricRouterConfiguration {
96    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
97        Ok(Box::new(MetricRouter::new(self.clone())?))
98    }
99
100    fn input_event_type(&self) -> EventType {
101        EventType::all_bits()
102    }
103
104    fn outputs(&self) -> &[OutputDefinition] {
105        static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
106            vec![
107                OutputDefinition::named_output("matched", EventType::all_bits()),
108                OutputDefinition::named_output("unmatched", EventType::all_bits()),
109            ]
110        });
111        &OUTPUTS
112    }
113}
114
115impl MemoryBounds for MetricRouterConfiguration {
116    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
117        builder
118            .minimum()
119            // Account for the MetricRouter struct itself when heap-allocated
120            .with_single_value::<MetricRouter>("component struct")
121            // Account for the HashSet<String> that stores metric names
122            // HashSet has overhead beyond just the strings themselves
123            .with_fixed_amount(
124                "hashset overhead",
125                std::mem::size_of::<std::collections::HashSet<String>>(),
126            )
127            // Account for the String allocations for each metric name
128            // This approximates the heap usage of the strings themselves
129            .with_fixed_amount(
130                "metric names strings",
131                self.metric_names
132                    .iter()
133                    .map(|name| name.len() + std::mem::size_of::<String>())
134                    .sum::<usize>(),
135            )
136            // Account for HashSet bucket overhead (rough approximation)
137            // HashSet typically allocates more buckets than items for good performance
138            .with_fixed_amount(
139                "hashset buckets",
140                self.metric_names.len() * std::mem::size_of::<Option<String>>() * 2, // Rough 2x bucket overhead
141            );
142    }
143}
144
145#[async_trait]
146impl Transform for MetricRouter {
147    async fn run(self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
148        let mut health = context.take_health_handle();
149
150        health.mark_ready();
151        debug!(
152            metric_names = ?self.metric_names,
153            "Metric Router transform started."
154        );
155
156        loop {
157            select! {
158                _ = health.live() => continue,
159                maybe_events = context.events().next() => match maybe_events {
160                    Some(events) => {
161                        // Process the batch of events with proper error handling
162                        if let Err(e) = self.process_event_batch(events, &mut context).await {
163                            error!(error = %e, "Failed to process event batch.");
164                            // Continue processing subsequent batches even if one fails
165                        }
166                    }
167                    None => {
168                        debug!("Event stream terminated, shutting down metric router transform");
169                        break;
170                    }
171                }
172            }
173        }
174
175        debug!("Metric Router transform stopped.");
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use saluki_core::data_model::event::{metric::Metric, Event};
183
184    use super::*;
185
186    #[test]
187    fn test_exact_metric_name_matching() {
188        let router = MetricRouter::new(MetricRouterConfiguration {
189            metric_names: vec!["test.counter".to_string(), "another.metric".to_string()],
190        })
191        .expect("Should create router successfully");
192
193        // Test metric event with matching name
194        let matching_metric = Event::Metric(Metric::counter("test.counter", 1.0));
195        assert!(router.should_route_to_matched(&matching_metric));
196
197        // Test metric event with another matching name
198        let another_matching_metric = Event::Metric(Metric::counter("another.metric", 5.0));
199        assert!(router.should_route_to_matched(&another_matching_metric));
200
201        // Test metric event with non-matching name
202        let non_matching_metric = Event::Metric(Metric::counter("different.metric", 1.0));
203        assert!(!router.should_route_to_matched(&non_matching_metric));
204
205        // Test partial match (should fail - we want exact matching)
206        let partial_match_metric = Event::Metric(Metric::counter("test.counter.extra", 1.0));
207        assert!(!router.should_route_to_matched(&partial_match_metric));
208    }
209
210    #[test]
211    fn test_empty_metric_names_list() {
212        let router = MetricRouter::new(MetricRouterConfiguration { metric_names: vec![] })
213            .expect("Should create router successfully");
214
215        // No metrics should be routed to matched when the list is empty
216        let metric_event = Event::Metric(Metric::counter("any.metric", 1.0));
217        assert!(!router.should_route_to_matched(&metric_event));
218    }
219
220    #[test]
221    fn test_input_validation() {
222        // Test empty string validation
223        let result = MetricRouter::new(MetricRouterConfiguration {
224            metric_names: vec!["valid.metric".to_string(), "".to_string()],
225        });
226        assert!(result.is_err());
227        if let Err(error) = result {
228            assert!(error.to_string().contains("empty"));
229        }
230
231        // Test whitespace-only string validation
232        let result = MetricRouter::new(MetricRouterConfiguration {
233            metric_names: vec!["valid.metric".to_string(), "   ".to_string()],
234        });
235        assert!(result.is_err());
236        if let Err(error) = result {
237            assert!(error.to_string().contains("empty"));
238        }
239
240        // Test valid configuration
241        let result = MetricRouter::new(MetricRouterConfiguration {
242            metric_names: vec!["valid.metric".to_string(), "another.valid".to_string()],
243        });
244        assert!(result.is_ok());
245    }
246}