saluki_components/transforms/metric_router/
mod.rs1use std::{collections::HashSet, sync::LazyLock};
2
3use async_trait::async_trait;
4use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_core::{
6 components::{
7 transforms::{Transform, TransformBuilder, TransformContext},
8 ComponentContext,
9 },
10 data_model::event::{Event, EventType},
11 topology::{EventsBuffer, OutputDefinition},
12};
13use saluki_error::{generic_error, GenericError};
14use serde::Deserialize;
15use tokio::select;
16use tracing::{debug, error};
17
18#[derive(Deserialize, Clone)]
20pub struct MetricRouterConfiguration {
21 pub metric_names: Vec<String>,
23}
24
25#[derive(Debug)]
30pub struct MetricRouter {
31 metric_names: HashSet<String>,
32}
33
34impl MetricRouter {
35 fn new(config: MetricRouterConfiguration) -> Result<Self, GenericError> {
36 for name in &config.metric_names {
37 if name.trim().is_empty() {
38 return Err(generic_error!("Metric names cannot be empty or whitespace-only"));
39 }
40 }
41
42 Ok(Self {
43 metric_names: config.metric_names.into_iter().collect(),
44 })
45 }
46
47 fn should_route_to_matched(&self, event: &Event) -> bool {
52 match event {
53 Event::Metric(metric) => {
54 let metric_name = metric.context().name();
56 self.metric_names.contains(metric_name.as_ref())
57 }
58 _ => {
59 false
61 }
62 }
63 }
64
65 async fn process_event_batch(
67 &self, mut events: EventsBuffer, context: &mut TransformContext,
68 ) -> Result<(), GenericError> {
69 let matched_events = events.extract(|event| self.should_route_to_matched(event));
71
72 let matched_count = context
73 .dispatcher()
74 .buffered_named("matched")?
75 .send_all(matched_events)
76 .await?;
77
78 let unmatched_count = context
79 .dispatcher()
80 .buffered_named("unmatched")?
81 .send_all(events)
82 .await?;
83
84 debug!(
85 matched_events = matched_count,
86 unmatched_events = unmatched_count,
87 "Successfully processed event batch."
88 );
89
90 Ok(())
91 }
92}
93
94#[async_trait]
95impl TransformBuilder for MetricRouterConfiguration {
96 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
97 Ok(Box::new(MetricRouter::new(self.clone())?))
98 }
99
100 fn input_event_type(&self) -> EventType {
101 EventType::all_bits()
102 }
103
104 fn outputs(&self) -> &[OutputDefinition] {
105 static OUTPUTS: LazyLock<Vec<OutputDefinition>> = LazyLock::new(|| {
106 vec![
107 OutputDefinition::named_output("matched", EventType::all_bits()),
108 OutputDefinition::named_output("unmatched", EventType::all_bits()),
109 ]
110 });
111 &OUTPUTS
112 }
113}
114
115impl MemoryBounds for MetricRouterConfiguration {
116 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
117 builder
118 .minimum()
119 .with_single_value::<MetricRouter>("component struct")
121 .with_fixed_amount(
124 "hashset overhead",
125 std::mem::size_of::<std::collections::HashSet<String>>(),
126 )
127 .with_fixed_amount(
130 "metric names strings",
131 self.metric_names
132 .iter()
133 .map(|name| name.len() + std::mem::size_of::<String>())
134 .sum::<usize>(),
135 )
136 .with_fixed_amount(
139 "hashset buckets",
140 self.metric_names.len() * std::mem::size_of::<Option<String>>() * 2, );
142 }
143}
144
145#[async_trait]
146impl Transform for MetricRouter {
147 async fn run(self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
148 let mut health = context.take_health_handle();
149
150 health.mark_ready();
151 debug!(
152 metric_names = ?self.metric_names,
153 "Metric Router transform started."
154 );
155
156 loop {
157 select! {
158 _ = health.live() => continue,
159 maybe_events = context.events().next() => match maybe_events {
160 Some(events) => {
161 if let Err(e) = self.process_event_batch(events, &mut context).await {
163 error!(error = %e, "Failed to process event batch.");
164 }
166 }
167 None => {
168 debug!("Event stream terminated, shutting down metric router transform");
169 break;
170 }
171 }
172 }
173 }
174
175 debug!("Metric Router transform stopped.");
176 Ok(())
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use saluki_core::data_model::event::{metric::Metric, Event};
183
184 use super::*;
185
186 #[test]
187 fn test_exact_metric_name_matching() {
188 let router = MetricRouter::new(MetricRouterConfiguration {
189 metric_names: vec!["test.counter".to_string(), "another.metric".to_string()],
190 })
191 .expect("Should create router successfully");
192
193 let matching_metric = Event::Metric(Metric::counter("test.counter", 1.0));
195 assert!(router.should_route_to_matched(&matching_metric));
196
197 let another_matching_metric = Event::Metric(Metric::counter("another.metric", 5.0));
199 assert!(router.should_route_to_matched(&another_matching_metric));
200
201 let non_matching_metric = Event::Metric(Metric::counter("different.metric", 1.0));
203 assert!(!router.should_route_to_matched(&non_matching_metric));
204
205 let partial_match_metric = Event::Metric(Metric::counter("test.counter.extra", 1.0));
207 assert!(!router.should_route_to_matched(&partial_match_metric));
208 }
209
210 #[test]
211 fn test_empty_metric_names_list() {
212 let router = MetricRouter::new(MetricRouterConfiguration { metric_names: vec![] })
213 .expect("Should create router successfully");
214
215 let metric_event = Event::Metric(Metric::counter("any.metric", 1.0));
217 assert!(!router.should_route_to_matched(&metric_event));
218 }
219
220 #[test]
221 fn test_input_validation() {
222 let result = MetricRouter::new(MetricRouterConfiguration {
224 metric_names: vec!["valid.metric".to_string(), "".to_string()],
225 });
226 assert!(result.is_err());
227 if let Err(error) = result {
228 assert!(error.to_string().contains("empty"));
229 }
230
231 let result = MetricRouter::new(MetricRouterConfiguration {
233 metric_names: vec!["valid.metric".to_string(), " ".to_string()],
234 });
235 assert!(result.is_err());
236 if let Err(error) = result {
237 assert!(error.to_string().contains("empty"));
238 }
239
240 let result = MetricRouter::new(MetricRouterConfiguration {
242 metric_names: vec!["valid.metric".to_string(), "another.valid".to_string()],
243 });
244 assert!(result.is_ok());
245 }
246}