saluki_components/transforms/autoscaling_failover_gateway/
mod.rs1use std::collections::HashSet;
4
5use async_trait::async_trait;
6use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
7use saluki_core::{
8 components::{
9 transforms::{Transform, TransformBuilder, TransformContext},
10 ComponentContext,
11 },
12 data_model::event::{metric::MetricValues, Event, EventType},
13 topology::{EventsBuffer, OutputDefinition},
14};
15use saluki_error::GenericError;
16use tokio::select;
17use tracing::{debug, error};
18
19use crate::config::AutoscalingFailoverConfiguration;
20
21pub struct AutoscalingFailoverGatewayConfiguration {
26 failover_config: AutoscalingFailoverConfiguration,
27}
28
29impl AutoscalingFailoverGatewayConfiguration {
30 pub fn new(failover_config: AutoscalingFailoverConfiguration) -> Self {
32 Self { failover_config }
33 }
34}
35
36#[derive(Debug)]
38enum GatewayMode {
39 Inactive,
41 FilteredForward { allowlist: HashSet<String> },
43}
44
45pub struct AutoscalingFailoverGateway {
47 mode: GatewayMode,
48}
49
50impl AutoscalingFailoverGateway {
51 fn new(failover_config: AutoscalingFailoverConfiguration) -> Self {
52 let mode = Self::mode_for_config(&failover_config);
53
54 Self { mode }
55 }
56
57 fn mode_for_config(failover_config: &AutoscalingFailoverConfiguration) -> GatewayMode {
58 if !failover_config.is_branch_requested() {
59 GatewayMode::Inactive
60 } else {
61 GatewayMode::FilteredForward {
62 allowlist: failover_config.metrics().iter().cloned().collect(),
63 }
64 }
65 }
66
67 fn should_forward(&self, event: &Event) -> bool {
68 let GatewayMode::FilteredForward { allowlist } = &self.mode else {
69 return false;
70 };
71 let Event::Metric(metric) = event else {
72 return false;
73 };
74 if !matches!(
75 metric.values(),
76 MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..)
77 ) {
78 return false;
79 }
80
81 allowlist.contains(metric.context().name().as_ref())
82 }
83
84 async fn process_event_batch(
85 &self, mut events: EventsBuffer, context: &mut TransformContext,
86 ) -> Result<(), GenericError> {
87 let input_events = events.len();
88 events.remove_if(|event| !self.should_forward(event));
89
90 let sent_events = context.dispatcher().buffered()?.send_all(events).await?;
91 let dropped_events = input_events - sent_events;
92 debug!(
93 sent_events,
94 dropped_events, "Autoscaling failover metrics gateway processed event batch."
95 );
96
97 Ok(())
98 }
99}
100
101#[async_trait]
102impl TransformBuilder for AutoscalingFailoverGatewayConfiguration {
103 async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
104 Ok(Box::new(AutoscalingFailoverGateway::new(self.failover_config.clone())))
105 }
106
107 fn input_event_type(&self) -> EventType {
108 EventType::Metric
109 }
110
111 fn outputs(&self) -> &[OutputDefinition<EventType>] {
112 static OUTPUTS: &[OutputDefinition<EventType>] = &[OutputDefinition::default_output(EventType::Metric)];
113 OUTPUTS
114 }
115}
116
117impl MemoryBounds for AutoscalingFailoverGatewayConfiguration {
118 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
119 let metrics = self.failover_config.metrics();
120 builder
121 .minimum()
122 .with_single_value::<AutoscalingFailoverGateway>("component struct")
123 .with_fixed_amount("hashset overhead", std::mem::size_of::<HashSet<String>>())
124 .with_fixed_amount(
125 "allowlist strings",
126 metrics
127 .iter()
128 .map(|name| name.len() + std::mem::size_of::<String>())
129 .sum::<usize>(),
130 )
131 .with_fixed_amount(
132 "hashset buckets",
133 metrics.len() * std::mem::size_of::<Option<String>>() * 2,
134 );
135 }
136}
137
138#[async_trait]
139impl Transform for AutoscalingFailoverGateway {
140 async fn run(self: Box<Self>, mut context: TransformContext) -> Result<(), GenericError> {
141 let mut health = context.take_health_handle();
142
143 health.mark_ready();
144 debug!(mode = ?self.mode, "Autoscaling failover metrics gateway transform started.");
145
146 loop {
147 select! {
148 _ = health.live() => continue,
149 maybe_events = context.events().next() => match maybe_events {
150 Some(events) => {
151 if let Err(e) = self.process_event_batch(events, &mut context).await {
152 error!(error = %e, "Autoscaling failover metrics gateway failed to process event batch.");
153 }
154 }
155 None => {
156 debug!("Event stream terminated, shutting down autoscaling failover metrics gateway transform.");
157 break;
158 }
159 },
160 }
161 }
162
163 debug!("Autoscaling failover metrics gateway transform stopped.");
164 Ok(())
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use std::time::Duration;
171
172 use saluki_config::ConfigurationLoader;
173 use saluki_core::data_model::event::{metric::Metric, Event};
174 use serde_json::json;
175
176 use super::*;
177
178 async fn gateway_from_config(value: serde_json::Value) -> AutoscalingFailoverGateway {
179 let (config, _) = ConfigurationLoader::for_tests(Some(value), None, false).await;
180 let failover_config = AutoscalingFailoverConfiguration::from_configuration(&config)
181 .expect("autoscaling failover configuration should deserialize");
182 AutoscalingFailoverGateway::new(failover_config)
183 }
184
185 #[tokio::test]
186 async fn inactive_gateway_drops_everything() {
187 let gw = gateway_from_config(json!({
188 "autoscaling": {
189 "failover": {
190 "enabled": false,
191 "metrics": ["allowed.metric"]
192 }
193 }
194 }))
195 .await;
196
197 assert!(!gw.should_forward(&Event::Metric(Metric::counter("allowed.metric", 1.0))));
198 }
199
200 #[tokio::test]
201 async fn empty_metric_allowlist_drops_everything() {
202 let gw = gateway_from_config(json!({
203 "autoscaling": {
204 "failover": {
205 "enabled": true,
206 "metrics": []
207 }
208 }
209 }))
210 .await;
211
212 assert!(!gw.should_forward(&Event::Metric(Metric::counter("allowed.metric", 1.0))));
213 }
214
215 #[tokio::test]
216 async fn active_gateway_forwards_only_allowed_series_metrics() {
217 let gw = gateway_from_config(json!({
218 "autoscaling": {
219 "failover": {
220 "enabled": true,
221 "metrics": ["allowed.counter", "allowed.gauge", "allowed.rate", "allowed.set"]
222 }
223 }
224 }))
225 .await;
226
227 assert!(gw.should_forward(&Event::Metric(Metric::counter("allowed.counter", 1.0))));
228 assert!(gw.should_forward(&Event::Metric(Metric::gauge("allowed.gauge", 1.0))));
229 assert!(gw.should_forward(&Event::Metric(Metric::rate(
230 "allowed.rate",
231 1.0,
232 Duration::from_secs(10)
233 ))));
234 assert!(gw.should_forward(&Event::Metric(Metric::set("allowed.set", "value"))));
235 assert!(!gw.should_forward(&Event::Metric(Metric::counter("blocked.counter", 1.0))));
236 }
237
238 #[tokio::test]
239 async fn active_gateway_drops_sketch_metrics_even_when_allowed() {
240 let gw = gateway_from_config(json!({
241 "autoscaling": {
242 "failover": {
243 "enabled": true,
244 "metrics": ["allowed.histogram", "allowed.distribution"]
245 }
246 }
247 }))
248 .await;
249
250 assert!(!gw.should_forward(&Event::Metric(Metric::histogram("allowed.histogram", [1.0, 2.0, 3.0]))));
251 assert!(!gw.should_forward(&Event::Metric(Metric::distribution(
252 "allowed.distribution",
253 [1.0, 2.0, 3.0]
254 ))));
255 }
256}