saluki_env/configstream/
stream.rs1use datadog_protos::agent::{config_event, ConfigSnapshot};
2use futures::StreamExt;
3use prost_types::value::Kind;
4use saluki_config::dynamic::ConfigUpdate;
5use saluki_config::GenericConfiguration;
6use saluki_error::GenericError;
7use serde_json::{Map, Value};
8use tokio::sync::mpsc;
9use tokio::time::Duration;
10use tracing::{debug, error, warn};
11
12use crate::helpers::remote_agent::RemoteAgentClient;
13
14pub async fn create_config_stream(config: &GenericConfiguration) -> Result<mpsc::Receiver<ConfigUpdate>, GenericError> {
16 let (sender, receiver) = mpsc::channel(100);
17
18 let client = match RemoteAgentClient::from_configuration(config).await {
19 Ok(client) => client,
20 Err(e) => {
21 error!("Failed to create remote agent client: {}.", e);
22 return Err(e);
23 }
24 };
25
26 tokio::spawn(run_config_stream_event_loop(client, sender));
27
28 Ok(receiver)
29}
30
31async fn run_config_stream_event_loop(mut client: RemoteAgentClient, sender: mpsc::Sender<ConfigUpdate>) {
32 loop {
33 debug!("Establishing a new config stream connection to the core agent...");
34 let mut rac = client.stream_config_events();
35
36 while let Some(result) = rac.next().await {
37 match result {
38 Ok(event) => {
39 let update = match event.event {
40 Some(config_event::Event::Snapshot(snapshot)) => {
41 let map = snapshot_to_map(&snapshot);
42 Some(ConfigUpdate::Snapshot(map))
43 }
44 Some(config_event::Event::Update(update)) => {
45 if let Some(setting) = update.setting {
46 let v = proto_value_to_serde_value(&setting.value);
47 Some(ConfigUpdate::Partial {
48 key: setting.key,
49 value: v,
50 })
51 } else {
52 None
53 }
54 }
55 None => {
56 error!("Received a configuration update event with no data.");
57 None
58 }
59 };
60
61 if let Some(update) = update {
62 if sender.send(update).await.is_err() {
63 warn!("Dynamic configuration channel closed. Config stream shutting down.");
65 break;
66 }
67 }
68 }
69 Err(e) => error!("Error while reading config event stream: {}.", e),
70 }
71 }
72
73 debug!("Config stream ended, retrying in 5 seconds...");
74 tokio::time::sleep(Duration::from_secs(5)).await;
75 }
76}
77
78fn snapshot_to_map(snapshot: &ConfigSnapshot) -> Value {
80 let mut map = Map::new();
81
82 for setting in &snapshot.settings {
83 let value = proto_value_to_serde_value(&setting.value);
84 map.insert(setting.key.clone(), value);
85 }
86
87 Value::Object(map)
88}
89
90fn proto_value_to_serde_value(proto_val: &Option<prost_types::Value>) -> Value {
92 let Some(kind) = proto_val.as_ref().and_then(|v| v.kind.as_ref()) else {
93 return Value::Null;
94 };
95
96 match kind {
97 Kind::NullValue(_) => Value::Null,
98 Kind::NumberValue(n) => {
99 if n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
100 Value::from(*n as i64)
101 } else {
102 Value::from(*n)
103 }
104 }
105 Kind::StringValue(s) => Value::String(s.clone()),
106 Kind::BoolValue(b) => Value::Bool(*b),
107 Kind::StructValue(s) => {
108 let json_map: Map<String, Value> = s
109 .fields
110 .iter()
111 .map(|(k, v)| (k.clone(), proto_value_to_serde_value(&Some(v.clone()))))
112 .collect();
113 Value::Object(json_map)
114 }
115
116 Kind::ListValue(l) => {
118 let json_list: Vec<Value> = l
119 .values
120 .iter()
121 .map(|v| proto_value_to_serde_value(&Some(v.clone())))
122 .collect();
123 Value::Array(json_list)
124 }
125 }
126}