saluki_config/dynamic/
watcher.rs1use std::future::pending as pending_forever;
4
5use serde::de::DeserializeOwned;
6use tokio::sync::broadcast;
7use tracing::warn;
8
9use crate::dynamic::ConfigChangeEvent;
10
11pub struct FieldUpdateWatcher {
18 pub(crate) key: String,
20 pub(crate) rx: Option<broadcast::Receiver<ConfigChangeEvent>>,
22}
23
24impl FieldUpdateWatcher {
25 pub async fn changed<T>(&mut self) -> (Option<T>, Option<T>)
27 where
28 T: DeserializeOwned,
29 {
30 if self.rx.is_none() {
31 pending_forever::<()>().await;
32 unreachable!();
33 }
34
35 let rx = self.rx.as_mut().unwrap();
36 loop {
37 match rx.recv().await {
38 Ok(event) if event.key == self.key => {
39 let old_ref = event.old_value.as_ref();
40 let new_ref = event.new_value.as_ref();
41
42 let old_t = old_ref.and_then(|ov| serde_json::from_value::<T>(ov.clone()).ok());
43 let new_t = new_ref.and_then(|nv| serde_json::from_value::<T>(nv.clone()).ok());
44
45 if new_t.is_some() || old_t.is_some() {
46 return (old_t, new_t);
47 }
48
49 if new_ref.is_some() {
51 warn!(
52 key = %self.key,
53 expected = %std::any::type_name::<T>(),
54 actual = %get_type_name(new_ref.as_ref().unwrap()),
55 "FieldUpdateWatcher failed to deserialize new value. Skipping update."
56 );
57 }
58 }
59 Ok(_) => continue,
61 Err(broadcast::error::RecvError::Lagged(_)) => {
62 warn!(
63 "FieldUpdateWatcher dropped events for key: {}. Continuing to wait for the next event.",
64 self.key
65 );
66 continue;
67 }
68 Err(broadcast::error::RecvError::Closed) => {
69 pending_forever::<()>().await;
71 unreachable!();
72 }
73 }
74 }
75 }
76}
77
78fn get_type_name(value: &serde_json::Value) -> &'static str {
79 match value {
80 serde_json::Value::Null => "null",
81 serde_json::Value::Bool(_) => "bool",
82 serde_json::Value::Number(_) => "number",
83 serde_json::Value::String(_) => "string",
84 serde_json::Value::Array(_) => "array",
85 serde_json::Value::Object(_) => "object",
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use crate::dynamic::event::ConfigUpdate;
92 use crate::ConfigurationLoader;
93
94 #[tokio::test]
95 async fn test_basic_field_update_watcher() {
96 let (cfg, sender) = ConfigurationLoader::for_tests(
97 Some(serde_json::json!({ "foobar": { "a": false, "b": "c" } })),
98 None,
99 true,
100 )
101 .await;
102 let sender = sender.expect("sender should exist");
103
104 sender
105 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
106 .await
107 .unwrap();
108 cfg.ready().await;
109
110 let mut watcher = cfg.watch_for_updates("watched_key");
111
112 sender
113 .send(ConfigUpdate::Partial {
114 key: "watched_key".to_string(),
115 value: serde_json::json!("hello"),
116 })
117 .await
118 .unwrap();
119
120 let (old, new) = tokio::time::timeout(std::time::Duration::from_secs(2), watcher.changed::<String>())
121 .await
122 .expect("timed out waiting for watched_key update");
123
124 assert_eq!(old, None);
125 assert_eq!(new, Some("hello".to_string()));
126 }
127
128 #[tokio::test]
129 async fn test_field_update_watcher_nested_key() {
130 let (cfg, sender) = ConfigurationLoader::for_tests(
131 Some(serde_json::json!({ "foobar": { "a": false, "b": "c" } })),
132 None,
133 true,
134 )
135 .await;
136 let sender = sender.expect("sender should exist");
137
138 sender
139 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
140 .await
141 .unwrap();
142 cfg.ready().await;
143
144 let mut watcher = cfg.watch_for_updates("foobar.a");
145
146 sender
148 .send(ConfigUpdate::Partial {
149 key: "foobar.a".to_string(),
150 value: serde_json::json!(true),
151 })
152 .await
153 .unwrap();
154
155 let (old, new) = tokio::time::timeout(std::time::Duration::from_secs(2), watcher.changed::<bool>())
156 .await
157 .expect("timed out waiting for foobar.a update");
158
159 assert_eq!(old, Some(false));
160 assert_eq!(new, Some(true));
161 assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
162
163 assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
165 }
166
167 #[tokio::test]
168 async fn test_field_update_watcher_parent_update() {
169 let (cfg, sender) = ConfigurationLoader::for_tests(
170 Some(serde_json::json!({ "foobar": { "a": false, "b": "c" } })),
171 None,
172 true,
173 )
174 .await;
175 let sender = sender.expect("sender should exist");
176
177 sender
178 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
179 .await
180 .unwrap();
181 cfg.ready().await;
182
183 let mut watcher = cfg.watch_for_updates("foobar.a");
184
185 sender
187 .send(ConfigUpdate::Partial {
188 key: "foobar".to_string(),
189 value: serde_json::json!({ "a": true }),
190 })
191 .await
192 .unwrap();
193
194 let (old, new) = tokio::time::timeout(std::time::Duration::from_secs(2), watcher.changed::<bool>())
195 .await
196 .expect("timed out waiting for foobar.a update");
197
198 assert_eq!(old, Some(false));
199 assert_eq!(new, Some(true));
200 assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
201
202 assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
204 }
205}