saluki_config/dynamic/
watcher.rs

1//! A watcher for a specific configuration key.
2
3use std::future::pending as pending_forever;
4
5use serde::de::DeserializeOwned;
6use tokio::sync::broadcast;
7use tracing::warn;
8
9use crate::dynamic::ConfigChangeEvent;
10
11/// A watcher for a specific configuration key.
12///
13/// It filters [`ConfigChangeEvent`]s down to the
14/// requested key.
15///
16/// If dynamic configuration is disabled, [`changed`](Self::changed) will wait indefinitely and never yield.
17pub struct FieldUpdateWatcher {
18    /// The configuration key to watch for updates.
19    pub(crate) key: String,
20    /// Receiver of global configuration change events (None when dynamic is disabled).
21    pub(crate) rx: Option<broadcast::Receiver<ConfigChangeEvent>>,
22}
23
24impl FieldUpdateWatcher {
25    /// Waits until the watched key changes and returns a typed (old, new) tuple.
26    pub async fn changed<T>(&mut self) -> (Option<T>, Option<T>)
27    where
28        T: DeserializeOwned,
29    {
30        if self.rx.is_none() {
31            pending_forever::<()>().await;
32            unreachable!();
33        }
34
35        let rx = self.rx.as_mut().unwrap();
36        loop {
37            match rx.recv().await {
38                Ok(event) if event.key == self.key => {
39                    let old_ref = event.old_value.as_ref();
40                    let new_ref = event.new_value.as_ref();
41
42                    let old_t = old_ref.and_then(|ov| serde_json::from_value::<T>(ov.clone()).ok());
43                    let new_t = new_ref.and_then(|nv| serde_json::from_value::<T>(nv.clone()).ok());
44
45                    if new_t.is_some() || old_t.is_some() {
46                        return (old_t, new_t);
47                    }
48
49                    // If a new value was present but failed to deserialize, warn so we don't silently hide updates.
50                    if new_ref.is_some() {
51                        warn!(
52                            key = %self.key,
53                            expected = %std::any::type_name::<T>(),
54                            actual = %get_type_name(new_ref.as_ref().unwrap()),
55                            "FieldUpdateWatcher failed to deserialize new value. Skipping update."
56                        );
57                    }
58                }
59                // Ignore other key changes.
60                Ok(_) => continue,
61                Err(broadcast::error::RecvError::Lagged(_)) => {
62                    warn!(
63                        "FieldUpdateWatcher dropped events for key: {}. Continuing to wait for the next event.",
64                        self.key
65                    );
66                    continue;
67                }
68                Err(broadcast::error::RecvError::Closed) => {
69                    // Keep pending forever to match "might never fire" semantics.
70                    pending_forever::<()>().await;
71                    unreachable!();
72                }
73            }
74        }
75    }
76}
77
78fn get_type_name(value: &serde_json::Value) -> &'static str {
79    match value {
80        serde_json::Value::Null => "null",
81        serde_json::Value::Bool(_) => "bool",
82        serde_json::Value::Number(_) => "number",
83        serde_json::Value::String(_) => "string",
84        serde_json::Value::Array(_) => "array",
85        serde_json::Value::Object(_) => "object",
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use crate::dynamic::event::ConfigUpdate;
92    use crate::ConfigurationLoader;
93
94    #[tokio::test]
95    async fn test_basic_field_update_watcher() {
96        let (cfg, sender) = ConfigurationLoader::for_tests(
97            Some(serde_json::json!({ "foobar": { "a": false, "b": "c" } })),
98            None,
99            true,
100        )
101        .await;
102        let sender = sender.expect("sender should exist");
103
104        sender
105            .send(ConfigUpdate::Snapshot(serde_json::json!({})))
106            .await
107            .unwrap();
108        cfg.ready().await;
109
110        let mut watcher = cfg.watch_for_updates("watched_key");
111
112        sender
113            .send(ConfigUpdate::Partial {
114                key: "watched_key".to_string(),
115                value: serde_json::json!("hello"),
116            })
117            .await
118            .unwrap();
119
120        let (old, new) = tokio::time::timeout(std::time::Duration::from_secs(2), watcher.changed::<String>())
121            .await
122            .expect("timed out waiting for watched_key update");
123
124        assert_eq!(old, None);
125        assert_eq!(new, Some("hello".to_string()));
126    }
127
128    #[tokio::test]
129    async fn test_field_update_watcher_nested_key() {
130        let (cfg, sender) = ConfigurationLoader::for_tests(
131            Some(serde_json::json!({ "foobar": { "a": false, "b": "c" } })),
132            None,
133            true,
134        )
135        .await;
136        let sender = sender.expect("sender should exist");
137
138        sender
139            .send(ConfigUpdate::Snapshot(serde_json::json!({})))
140            .await
141            .unwrap();
142        cfg.ready().await;
143
144        let mut watcher = cfg.watch_for_updates("foobar.a");
145
146        // Update nested value via dotted path
147        sender
148            .send(ConfigUpdate::Partial {
149                key: "foobar.a".to_string(),
150                value: serde_json::json!(true),
151            })
152            .await
153            .unwrap();
154
155        let (old, new) = tokio::time::timeout(std::time::Duration::from_secs(2), watcher.changed::<bool>())
156            .await
157            .expect("timed out waiting for foobar.a update");
158
159        assert_eq!(old, Some(false));
160        assert_eq!(new, Some(true));
161        assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
162
163        // Existing nested key not updated is still present
164        assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
165    }
166
167    #[tokio::test]
168    async fn test_field_update_watcher_parent_update() {
169        let (cfg, sender) = ConfigurationLoader::for_tests(
170            Some(serde_json::json!({ "foobar": { "a": false, "b": "c" } })),
171            None,
172            true,
173        )
174        .await;
175        let sender = sender.expect("sender should exist");
176
177        sender
178            .send(ConfigUpdate::Snapshot(serde_json::json!({})))
179            .await
180            .unwrap();
181        cfg.ready().await;
182
183        let mut watcher = cfg.watch_for_updates("foobar.a");
184
185        // Update parent object directly
186        sender
187            .send(ConfigUpdate::Partial {
188                key: "foobar".to_string(),
189                value: serde_json::json!({ "a": true }),
190            })
191            .await
192            .unwrap();
193
194        let (old, new) = tokio::time::timeout(std::time::Duration::from_secs(2), watcher.changed::<bool>())
195            .await
196            .expect("timed out waiting for foobar.a update");
197
198        assert_eq!(old, Some(false));
199        assert_eq!(new, Some(true));
200        assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
201
202        // Existing nested key not updated is still present
203        assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
204    }
205}