Skip to main content

saluki_components/transforms/autoscaling_failover_gateway/
mod.rs

1//! Autoscaling failover metrics gateway transform.
2
3use std::collections::HashSet;
4
5use async_trait::async_trait;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_core::{
8    components::{
9        transforms::{Transform, TransformBuilder, TransformContext},
10        ComponentContext,
11    },
12    data_model::event::{metric::MetricValues, Event, EventType},
13    topology::{EventsBuffer, OutputDefinition},
14};
15use saluki_error::GenericError;
16use tokio::select;
17use tracing::{debug, error};
18
19use crate::config::AutoscalingFailoverConfiguration;
20
21/// Autoscaling failover metrics gateway transform configuration.
22///
23/// This transform sits between the shared metrics enrichment stage and the Cluster Agent encoder/forwarder branch. It
24/// forwards only configured series metrics to the Cluster Agent branch and drops everything else.
25pub struct AutoscalingFailoverGatewayConfiguration {
26    failover_config: AutoscalingFailoverConfiguration,
27}
28
29impl AutoscalingFailoverGatewayConfiguration {
30    /// Creates a new `AutoscalingFailoverGatewayConfiguration` from the given failover configuration.
31    pub fn new(failover_config: AutoscalingFailoverConfiguration) -> Self {
32        Self { failover_config }
33    }
34}
35
36/// Routing and filtering state for the autoscaling failover metrics gateway.
37#[derive(Debug)]
38enum GatewayMode {
39    /// Autoscaling failover is disabled or has no allowed metrics; drop all events.
40    Inactive,
41    /// Autoscaling failover is active; forward only matching series metrics.
42    FilteredForward { allowlist: HashSet<String> },
43}
44
45/// Autoscaling failover metrics gateway transform.
46pub struct AutoscalingFailoverGateway {
47    mode: GatewayMode,
48}
49
50impl AutoscalingFailoverGateway {
51    fn new(failover_config: AutoscalingFailoverConfiguration) -> Self {
52        let mode = Self::mode_for_config(&failover_config);
53
54        Self { mode }
55    }
56
57    fn mode_for_config(failover_config: &AutoscalingFailoverConfiguration) -> GatewayMode {
58        if !failover_config.is_branch_requested() {
59            GatewayMode::Inactive
60        } else {
61            GatewayMode::FilteredForward {
62                allowlist: failover_config.metrics().iter().cloned().collect(),
63            }
64        }
65    }
66
67    fn should_forward(&self, event: &Event) -> bool {
68        let GatewayMode::FilteredForward { allowlist } = &self.mode else {
69            return false;
70        };
71        let Event::Metric(metric) = event else {
72            return false;
73        };
74        if !matches!(
75            metric.values(),
76            MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..)
77        ) {
78            return false;
79        }
80
81        allowlist.contains(metric.context().name().as_ref())
82    }
83
84    async fn process_event_batch(
85        &self, mut events: EventsBuffer, context: &mut TransformContext,
86    ) -> Result<(), GenericError> {
87        let input_events = events.len();
88        events.remove_if(|event| !self.should_forward(event));
89
90        let sent_events = context.dispatcher().buffered()?.send_all(events).await?;
91        let dropped_events = input_events - sent_events;
92        debug!(
93            sent_events,
94            dropped_events, "Autoscaling failover metrics gateway processed event batch."
95        );
96
97        Ok(())
98    }
99}
100
101#[async_trait]
102impl TransformBuilder for AutoscalingFailoverGatewayConfiguration {
103    async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
104        Ok(Box::new(AutoscalingFailoverGateway::new(self.failover_config.clone())))
105    }
106
107    fn input_event_type(&self) -> EventType {
108        EventType::Metric
109    }
110
111    fn outputs(&self) -> &[OutputDefinition<EventType>] {
112        static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
113        OUTPUTS
114    }
115}
116
117impl MemoryBounds for AutoscalingFailoverGatewayConfiguration {
118    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
119        let metrics = self.failover_config.metrics();
120        builder
121            .minimum()
122            .with_single_value::<AutoscalingFailoverGateway>("component struct")
123            .with_fixed_amount("hashset overhead", std::mem::size_of::<HashSet<String>>())
124            .with_fixed_amount(
125                "allowlist strings",
126                metrics
127                    .iter()
128                    .map(|name| name.len() + std::mem::size_of::<String>())
129                    .sum::<usize>(),
130            )
131            .with_fixed_amount(
132                "hashset buckets",
133                metrics.len() * std::mem::size_of::<Option<String>>() * 2,
134            );
135    }
136}
137
138#[async_trait]
139impl Transform for AutoscalingFailoverGateway {
140    async fn run(self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
141        let mut health = context.take_health_handle();
142
143        health.mark_ready();
144        debug!(mode = ?self.mode, "Autoscaling failover metrics gateway transform started.");
145
146        loop {
147            select! {
148                _ = health.live() => continue,
149                maybe_events = context.events().next() => match maybe_events {
150                    Some(events) => {
151                        if let Err(e) = self.process_event_batch(events, &mut context).await {
152                            error!(error = %e, "Autoscaling failover metrics gateway failed to process event batch.");
153                        }
154                    }
155                    None => {
156                        debug!("Event stream terminated, shutting down autoscaling failover metrics gateway transform.");
157                        break;
158                    }
159                },
160            }
161        }
162
163        debug!("Autoscaling failover metrics gateway transform stopped.");
164        Ok(())
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use std::time::Duration;
171
172    use saluki_config::ConfigurationLoader;
173    use saluki_core::data_model::event::{metric::Metric, Event};
174    use serde_json::json;
175
176    use super::*;
177
178    async fn gateway_from_config(value: serde_json::Value) -> AutoscalingFailoverGateway {
179        let (config, _) = ConfigurationLoader::for_tests(Some(value), None, false).await;
180        let failover_config = AutoscalingFailoverConfiguration::from_configuration(&config)
181            .expect("autoscaling failover configuration should deserialize");
182        AutoscalingFailoverGateway::new(failover_config)
183    }
184
185    #[tokio::test]
186    async fn inactive_gateway_drops_everything() {
187        let gw = gateway_from_config(json!({
188            "autoscaling": {
189                "failover": {
190                    "enabled": false,
191                    "metrics": ["allowed.metric"]
192                }
193            }
194        }))
195        .await;
196
197        assert!(!gw.should_forward(&Event::Metric(Metric::counter("allowed.metric", 1.0))));
198    }
199
200    #[tokio::test]
201    async fn empty_metric_allowlist_drops_everything() {
202        let gw = gateway_from_config(json!({
203            "autoscaling": {
204                "failover": {
205                    "enabled": true,
206                    "metrics": []
207                }
208            }
209        }))
210        .await;
211
212        assert!(!gw.should_forward(&Event::Metric(Metric::counter("allowed.metric", 1.0))));
213    }
214
215    #[tokio::test]
216    async fn active_gateway_forwards_only_allowed_series_metrics() {
217        let gw = gateway_from_config(json!({
218            "autoscaling": {
219                "failover": {
220                    "enabled": true,
221                    "metrics": ["allowed.counter", "allowed.gauge", "allowed.rate", "allowed.set"]
222                }
223            }
224        }))
225        .await;
226
227        assert!(gw.should_forward(&Event::Metric(Metric::counter("allowed.counter", 1.0))));
228        assert!(gw.should_forward(&Event::Metric(Metric::gauge("allowed.gauge", 1.0))));
229        assert!(gw.should_forward(&Event::Metric(Metric::rate(
230            "allowed.rate",
231            1.0,
232            Duration::from_secs(10)
233        ))));
234        assert!(gw.should_forward(&Event::Metric(Metric::set("allowed.set", "value"))));
235        assert!(!gw.should_forward(&Event::Metric(Metric::counter("blocked.counter", 1.0))));
236    }
237
238    #[tokio::test]
239    async fn active_gateway_drops_sketch_metrics_even_when_allowed() {
240        let gw = gateway_from_config(json!({
241            "autoscaling": {
242                "failover": {
243                    "enabled": true,
244                    "metrics": ["allowed.histogram", "allowed.distribution"]
245                }
246            }
247        }))
248        .await;
249
250        assert!(!gw.should_forward(&Event::Metric(Metric::histogram("allowed.histogram", [1.0, 2.0, 3.0]))));
251        assert!(!gw.should_forward(&Event::Metric(Metric::distribution(
252            "allowed.distribution",
253            [1.0, 2.0, 3.0]
254        ))));
255    }
256}