saluki_env/configstream/
stream.rs

1use datadog_protos::agent::{config_event, ConfigSnapshot};
2use futures::StreamExt;
3use prost_types::value::Kind;
4use saluki_config::dynamic::ConfigUpdate;
5use saluki_config::GenericConfiguration;
6use saluki_error::GenericError;
7use serde_json::{Map, Value};
8use tokio::sync::mpsc;
9use tokio::time::Duration;
10use tracing::{debug, error, warn};
11
12use crate::helpers::remote_agent::RemoteAgentClient;
13
14/// Creates a new `ConfigStreamer` that receives a stream of config events from the remote agent.
15pub async fn create_config_stream(config: &GenericConfiguration) -> Result<mpsc::Receiver<ConfigUpdate>, GenericError> {
16    let (sender, receiver) = mpsc::channel(100);
17
18    let client = match RemoteAgentClient::from_configuration(config).await {
19        Ok(client) => client,
20        Err(e) => {
21            error!("Failed to create remote agent client: {}.", e);
22            return Err(e);
23        }
24    };
25
26    tokio::spawn(run_config_stream_event_loop(client, sender));
27
28    Ok(receiver)
29}
30
31async fn run_config_stream_event_loop(mut client: RemoteAgentClient, sender: mpsc::Sender<ConfigUpdate>) {
32    loop {
33        debug!("Establishing a new config stream connection to the core agent...");
34        let mut rac = client.stream_config_events();
35
36        while let Some(result) = rac.next().await {
37            match result {
38                Ok(event) => {
39                    let update = match event.event {
40                        Some(config_event::Event::Snapshot(snapshot)) => {
41                            let map = snapshot_to_map(&snapshot);
42                            Some(ConfigUpdate::Snapshot(map))
43                        }
44                        Some(config_event::Event::Update(update)) => {
45                            if let Some(setting) = update.setting {
46                                let v = proto_value_to_serde_value(&setting.value);
47                                Some(ConfigUpdate::Partial {
48                                    key: setting.key,
49                                    value: v,
50                                })
51                            } else {
52                                None
53                            }
54                        }
55                        None => {
56                            error!("Received a configuration update event with no data.");
57                            None
58                        }
59                    };
60
61                    if let Some(update) = update {
62                        if sender.send(update).await.is_err() {
63                            // The receiver was dropped
64                            warn!("Dynamic configuration channel closed. Config stream shutting down.");
65                            break;
66                        }
67                    }
68                }
69                Err(e) => error!("Error while reading config event stream: {}.", e),
70            }
71        }
72
73        debug!("Config stream ended, retrying in 5 seconds...");
74        tokio::time::sleep(Duration::from_secs(5)).await;
75    }
76}
77
78/// Converts a `ConfigSnapshot` into a single flat `serde_json::Value::Object` (a map).
79fn snapshot_to_map(snapshot: &ConfigSnapshot) -> Value {
80    let mut map = Map::new();
81
82    for setting in &snapshot.settings {
83        let value = proto_value_to_serde_value(&setting.value);
84        map.insert(setting.key.clone(), value);
85    }
86
87    Value::Object(map)
88}
89
90/// Recursively converts a `google::protobuf::Value` into a `serde_json::Value`.
91fn proto_value_to_serde_value(proto_val: &Option<prost_types::Value>) -> Value {
92    let Some(kind) = proto_val.as_ref().and_then(|v| v.kind.as_ref()) else {
93        return Value::Null;
94    };
95
96    match kind {
97        Kind::NullValue(_) => Value::Null,
98        Kind::NumberValue(n) => {
99            if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
100                Value::from(*n as i64)
101            } else {
102                Value::from(*n)
103            }
104        }
105        Kind::StringValue(s) => Value::String(s.clone()),
106        Kind::BoolValue(b) => Value::Bool(*b),
107        Kind::StructValue(s) => {
108            let json_map: Map<String, Value> = s
109                .fields
110                .iter()
111                .map(|(k, v)| (k.clone(), proto_value_to_serde_value(&Some(v.clone()))))
112                .collect();
113            Value::Object(json_map)
114        }
115
116        // If the value is a list, convert it to an array.
117        Kind::ListValue(l) => {
118            let json_list: Vec<Value> = l
119                .values
120                .iter()
121                .map(|v| proto_value_to_serde_value(&Some(v.clone())))
122                .collect();
123            Value::Array(json_list)
124        }
125    }
126}