1#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10 error::Kind,
11 providers::{Env, Serialized},
12 Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::Snafu;
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod duration_string;
21pub mod dynamic;
22mod provider;
23pub mod space_separated;
24
25pub use self::duration_string::{parse_duration, DurationString, ParseDurationError};
26pub use self::dynamic::FieldUpdateWatcher;
27use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
28use self::provider::ResolvedProvider;
29pub use self::space_separated::{deserialize_opt_space_separated_or_seq, deserialize_space_separated_or_seq};
30
31#[derive(Clone)]
32struct ArcProvider(Arc<dyn Provider + Send + Sync>);
33
34impl Provider for ArcProvider {
35 fn metadata(&self) -> figment::Metadata {
36 self.0.metadata()
37 }
38
39 fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
40 self.0.data()
41 }
42}
43
44enum ProviderSource {
45 Static(ArcProvider),
46 Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
47}
48
49impl Clone for ProviderSource {
50 fn clone(&self) -> Self {
51 match self {
52 Self::Static(p) => Self::Static(p.clone()),
53 Self::Dynamic(_) => Self::Dynamic(None),
54 }
55 }
56}
57
58#[derive(Debug, Snafu)]
60#[snafu(context(suffix(false)))]
61pub enum ConfigurationError {
62 #[snafu(display("Environment variable prefix must not be empty."))]
64 EmptyPrefix,
65
66 #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
68 MissingField {
69 help_text: String,
74
75 field: Cow<'static, str>,
77 },
78
79 #[snafu(display(
81 "Expected value for field '{}' to be '{}', got '{}' instead.",
82 field,
83 expected_ty,
84 actual_ty
85 ))]
86 InvalidFieldType {
87 field: String,
91
92 expected_ty: String,
94
95 actual_ty: String,
97 },
98
99 #[snafu(transparent)]
101 Generic {
102 source: GenericError,
104 },
105}
106
107impl From<figment::Error> for ConfigurationError {
108 fn from(e: figment::Error) -> Self {
109 match e.kind {
110 Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
111 field: e.path.join("."),
112 expected_ty,
113 actual_ty: actual_ty.to_string(),
114 },
115 _ => Self::Generic { source: e.into() },
116 }
117 }
118}
119
120#[derive(Clone, Debug, Eq, Hash, PartialEq)]
121enum LookupSource {
122 Environment { prefix: String },
124}
125
126impl LookupSource {
127 fn transform_key(&self, key: &str) -> String {
128 match self {
129 LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
132 }
133 }
134}
135
136#[derive(Clone, Default)]
152pub struct ConfigurationLoader {
153 key_aliases: &'static [(&'static str, &'static str)],
154 lookup_sources: HashSet<LookupSource>,
155 provider_sources: Vec<ProviderSource>,
156}
157
158impl ConfigurationLoader {
159 pub fn with_key_aliases(mut self, aliases: &'static [(&'static str, &'static str)]) -> Self {
168 self.key_aliases = aliases;
169 self
170 }
171
172 pub fn add_providers<P, I>(mut self, providers: I) -> Self
182 where
183 P: Provider + Send + Sync + 'static,
184 I: IntoIterator<Item = P>,
185 {
186 for p in providers {
187 self.provider_sources
188 .push(ProviderSource::Static(ArcProvider(Arc::new(p))));
189 }
190 self
191 }
192
193 pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
199 where
200 P: AsRef<std::path::Path>,
201 {
202 let resolved_provider = ResolvedProvider::from_yaml(&path, self.key_aliases)?;
203 self.provider_sources
204 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
205 Ok(self)
206 }
207
208 pub fn try_from_yaml<P>(mut self, path: P) -> Self
212 where
213 P: AsRef<std::path::Path>,
214 {
215 match ResolvedProvider::from_yaml(&path, self.key_aliases) {
216 Ok(resolved_provider) => {
217 self.provider_sources
218 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
219 }
220 Err(e) => {
221 println!(
222 "Unable to read YAML configuration file '{}': {}. Ignoring.",
223 path.as_ref().to_string_lossy(),
224 e
225 );
226 }
227 }
228 self
229 }
230
231 pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
237 where
238 P: AsRef<std::path::Path>,
239 {
240 let resolved_provider = ResolvedProvider::from_json(&path, self.key_aliases)?;
241 self.provider_sources
242 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
243 Ok(self)
244 }
245
246 pub fn try_from_json<P>(mut self, path: P) -> Self
250 where
251 P: AsRef<std::path::Path>,
252 {
253 match ResolvedProvider::from_json(&path, self.key_aliases) {
254 Ok(resolved_provider) => {
255 self.provider_sources
256 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
257 }
258 Err(e) => {
259 println!(
260 "Unable to read JSON configuration file '{}': {}. Ignoring.",
261 path.as_ref().to_string_lossy(),
262 e
263 );
264 }
265 }
266 self
267 }
268
269 pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
282 if prefix.is_empty() {
283 return Err(ConfigurationError::EmptyPrefix);
284 }
285
286 let prefix = if prefix.ends_with('_') {
287 prefix.to_string()
288 } else {
289 format!("{}_", prefix)
290 };
291
292 let env = Env::prefixed(&prefix).split("__");
294 let values = env.data().unwrap();
295 if let Some(default_dict) = values.get(&figment::Profile::Default) {
296 self.provider_sources
297 .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
298 default_dict.clone(),
299 )))));
300 self.lookup_sources.insert(LookupSource::Environment { prefix });
301 }
302 Ok(self)
303 }
304
305 pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
309 self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
310 self
311 }
312
313 pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
319 where
320 T: Deserialize<'a>,
321 {
322 let figment = build_figment_from_sources(&self.provider_sources);
323 figment.extract().map_err(Into::into)
324 }
325
326 pub fn bootstrap_generic(&self) -> GenericConfiguration {
331 let figment = build_figment_from_sources(&self.provider_sources);
332
333 GenericConfiguration {
334 inner: Arc::new(Inner {
335 figment: RwLock::new(figment),
336 lookup_sources: self.lookup_sources.clone(),
337 event_sender: None,
338 ready_signal: Mutex::new(None),
339 }),
340 }
341 }
342
343 pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
345 let has_dynamic_provider = self
346 .provider_sources
347 .iter()
348 .any(|s| matches!(s, ProviderSource::Dynamic(_)));
349
350 if has_dynamic_provider {
351 let mut receiver_opt = None;
352 for source in self.provider_sources.iter_mut() {
353 if let ProviderSource::Dynamic(ref mut receiver) = source {
354 receiver_opt = receiver.take();
355 break;
356 }
357 }
358 let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
359
360 let figment = build_figment_from_sources(&self.provider_sources);
362
363 let (event_sender, _) = broadcast::channel(100);
364 let (ready_sender, ready_receiver) = oneshot::channel();
365
366 let generic_config = GenericConfiguration {
367 inner: Arc::new(Inner {
368 figment: RwLock::new(figment),
369 lookup_sources: self.lookup_sources,
370 event_sender: Some(event_sender.clone()),
371 ready_signal: Mutex::new(Some(ready_receiver)),
372 }),
373 };
374
375 tokio::spawn(run_dynamic_config_updater(
377 generic_config.inner.clone(),
378 receiver,
379 self.provider_sources,
380 event_sender,
381 ready_sender,
382 ));
383
384 Ok(generic_config)
385 } else {
386 let figment = build_figment_from_sources(&self.provider_sources);
388
389 Ok(GenericConfiguration {
390 inner: Arc::new(Inner {
391 figment: RwLock::new(figment),
392 lookup_sources: self.lookup_sources,
393 event_sender: None,
394 ready_signal: Mutex::new(None),
395 }),
396 })
397 }
398 }
399
400 pub async fn for_tests(
411 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
412 enable_dynamic_configuration: bool,
413 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
414 Self::for_tests_with_provider_factory(file_values, env_vars, enable_dynamic_configuration, &[], || {
415 Serialized::defaults(serde_json::json!({}))
416 })
417 .await
418 }
419
420 pub async fn for_tests_with_provider_factory<P, F>(
429 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
430 enable_dynamic_configuration: bool, key_aliases: &'static [(&'static str, &'static str)], provider_factory: F,
431 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>)
432 where
433 P: Provider + Send + Sync + 'static,
434 F: FnOnce() -> P,
435 {
436 let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
437 let path = &json_file.path();
438 let json_to_write = file_values.unwrap_or(serde_json::json!({}));
439 serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
440
441 let mut loader = ConfigurationLoader::default()
442 .with_key_aliases(key_aliases)
443 .try_from_json(path);
444 let mut maybe_sender = None;
445 if enable_dynamic_configuration {
446 let (sender, receiver) = tokio::sync::mpsc::channel(1);
447 loader = loader.with_dynamic_configuration(receiver);
448 maybe_sender = Some(sender);
449 }
450
451 static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
452
453 let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
454
455 if let Some(pairs) = env_vars.as_ref() {
456 for (k, v) in pairs.iter() {
457 std::env::set_var(k, v);
461 std::env::set_var(format!("TEST_{}", k), v);
462 }
463 }
464
465 let loader = loader.add_providers([provider_factory()]);
467
468 let loader = loader
470 .from_environment("TEST")
471 .expect("should not fail to add environment provider");
472
473 if let Some(pairs) = env_vars.as_ref() {
475 for (k, _) in pairs.iter() {
476 std::env::remove_var(k);
477 std::env::remove_var(format!("TEST_{}", k));
478 }
479 }
480
481 drop(guard);
482
483 let cfg = loader
484 .into_generic()
485 .await
486 .expect("should not fail to build generic configuration");
487
488 (cfg, maybe_sender)
489 }
490}
491
492fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
493 sources.iter().fold(Figment::new(), |figment, source| match source {
494 ProviderSource::Static(p) => figment.admerge(p.clone()),
495 ProviderSource::Dynamic(_) => figment,
497 })
498}
499
500pub fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
504 if !root.is_object() {
505 *root = serde_json::Value::Object(serde_json::Map::new());
506 }
507
508 let mut current = root;
509 let mut segments = key.split('.').peekable();
511
512 while let Some(seg) = segments.next() {
513 let is_leaf = segments.peek().is_none();
514
515 if !current.is_object() {
517 *current = serde_json::Value::Object(serde_json::Map::new());
518 }
519 let node = current.as_object_mut().expect("current node should be an object");
520
521 if is_leaf {
522 node.insert(seg.to_string(), value);
523 break;
524 } else {
525 let should_create_node = match node.get(seg) {
527 Some(v) => !v.is_object(),
528 None => true,
529 };
530 if should_create_node {
532 node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
533 }
534
535 current = node.get_mut(seg).expect("should not fail to get nested object");
537 }
538 }
539}
540
541async fn run_dynamic_config_updater(
542 inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
543 sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
544) {
545 let initial_update = match receiver.recv().await {
547 Some(update) => update,
548 None => {
549 debug!("Dynamic configuration channel closed before initial snapshot.");
551 return;
552 }
553 };
554
555 let mut dynamic_state = match initial_update {
556 ConfigUpdate::Snapshot(state) => state,
557 ConfigUpdate::Partial { .. } => {
558 error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
560 serde_json::Value::Null
561 }
562 };
563
564 let new_figment = provider_sources
566 .iter()
567 .fold(Figment::new(), |figment, source| match source {
568 ProviderSource::Static(p) => figment.admerge(p.clone()),
569 ProviderSource::Dynamic(_) => {
570 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
571 }
572 });
573
574 {
576 let mut figment_guard = inner.figment.write().unwrap();
577 *figment_guard = new_figment.clone();
578 }
579
580 if ready_sender.send(()).is_err() {
582 debug!("Configuration readiness receiver dropped. Updater task shutting down.");
583 return;
584 }
585
586 let mut current_config: figment::value::Value = new_figment.extract().unwrap();
588
589 loop {
591 let update = match receiver.recv().await {
592 Some(update) => update,
593 None => {
594 debug!("Dynamic configuration update channel closed. Updater task shutting down.");
596 return;
597 }
598 };
599
600 match update {
602 ConfigUpdate::Snapshot(new_state) => {
603 debug!("Received configuration snapshot update.");
604 dynamic_state = new_state;
605 }
606 ConfigUpdate::Partial { key, value } => {
607 debug!(%key, "Received partial configuration update.");
608 if dynamic_state.is_null() {
609 dynamic_state = serde_json::Value::Object(serde_json::Map::new());
610 }
611 if dynamic_state.is_object() {
612 upsert(&mut dynamic_state, &key, value);
613 } else {
614 error!(
615 "Received partial update but current dynamic state is not an object. This should not happen."
616 );
617 }
618 }
619 }
620
621 let new_figment = provider_sources
623 .iter()
624 .fold(Figment::new(), |figment, source| match source {
625 ProviderSource::Static(p) => figment.admerge(p.clone()),
626 ProviderSource::Dynamic(_) => {
627 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
628 }
629 });
630
631 let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
632
633 if current_config != new_config {
634 for change in dynamic::diff_config(¤t_config, &new_config) {
635 let _ = sender.send(change);
639 }
640
641 let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
642 error!("Failed to acquire write lock for dynamic configuration: {}", e);
643 e.into_inner()
644 });
645 *figment_guard = new_figment;
646
647 current_config = new_config;
649 }
650 }
651}
652
653#[derive(Debug)]
654struct Inner {
655 figment: RwLock<Figment>,
656 lookup_sources: HashSet<LookupSource>,
657 event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
658 ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
659}
660
661#[derive(Clone, Debug)]
683pub struct GenericConfiguration {
684 inner: Arc<Inner>,
685}
686
687impl GenericConfiguration {
688 pub async fn ready(&self) {
695 let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
698 if let Some(ready_rx) = maybe_ready_rx.take() {
699 if ready_rx.await.is_err() {
701 error!("Failed to receive configuration readiness signal; updater task may have panicked.");
702 }
703 }
704 }
705
706 fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
707 where
708 T: Deserialize<'a>,
709 {
710 let figment_guard = self.inner.figment.read().unwrap();
711 match figment_guard.extract_inner(key) {
712 Ok(value) => Ok(value),
713 Err(e) => {
714 if matches!(e.kind, figment::error::Kind::MissingField(_)) {
715 let fallback_key = key.replace('.', "_");
720 figment_guard
721 .extract_inner(&fallback_key)
722 .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
723 } else {
724 Err(e.into())
725 }
726 }
727 }
728 }
729
730 pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
739 where
740 T: Deserialize<'a>,
741 {
742 self.get(key)
743 }
744
745 pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
752 where
753 T: Default + Deserialize<'a>,
754 {
755 self.get(key).unwrap_or_default()
756 }
757
758 pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
769 where
770 T: Deserialize<'a>,
771 {
772 match self.get(key) {
773 Ok(value) => Ok(Some(value)),
774 Err(ConfigurationError::MissingField { .. }) => Ok(None),
775 Err(e) => Err(e),
776 }
777 }
778
779 pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
785 where
786 T: Deserialize<'a>,
787 {
788 self.inner
789 .figment
790 .read()
791 .unwrap()
792 .extract()
793 .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
794 }
795
796 pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
798 self.inner.event_sender.as_ref().map(|s| s.subscribe())
799 }
800
801 pub fn flattened_keys(&self) -> Result<Vec<(String, serde_json::Value)>, ConfigurationError> {
811 let root: serde_json::Value = self.as_typed()?;
812 let mut out = Vec::new();
813 flatten_value(&root, &mut String::new(), &mut out);
814 Ok(out)
815 }
816
817 pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
822 FieldUpdateWatcher {
823 key: key.to_string(),
824 rx: self.subscribe_for_updates(),
825 }
826 }
827}
828
829fn flatten_value(value: &serde_json::Value, prefix: &mut String, out: &mut Vec<(String, serde_json::Value)>) {
831 if let serde_json::Value::Object(map) = value {
832 for (key, child) in map {
833 let prev_len = prefix.len();
834 if !prefix.is_empty() {
835 prefix.push('.');
836 }
837 prefix.push_str(key);
838 flatten_value(child, prefix, out);
839 prefix.truncate(prev_len);
840 }
841 } else {
842 out.push((prefix.clone(), value.clone()));
843 }
844}
845
846fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
847 match e.kind {
848 Kind::MissingField(field) => {
849 let mut valid_keys = lookup_sources
850 .iter()
851 .map(|source| source.transform_key(&field))
852 .collect::<Vec<_>>();
853
854 valid_keys.insert(0, field.to_string());
856
857 let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
858
859 ConfigurationError::MissingField { help_text, field }
860 }
861 Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
862 field: e.path.join("."),
863 expected_ty,
864 actual_ty: actual_ty.to_string(),
865 },
866 _ => ConfigurationError::Generic { source: e.into() },
867 }
868}
869
870#[cfg(test)]
871mod tests {
872 use super::*;
873
874 #[tokio::test]
875 async fn test_static_configuration() {
876 let (cfg, _) = ConfigurationLoader::for_tests(
877 Some(serde_json::json!({
878 "foo": "bar",
879 "baz": 5,
880 "foobar": { "a": false, "b": "c" }
881 })),
882 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
883 false,
884 )
885 .await;
886 cfg.ready().await;
887
888 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
889 assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
890 assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
891 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
892 assert!(matches!(
893 cfg.get::<String>("nonexistentKey"),
894 Err(ConfigurationError::MissingField { .. })
895 ));
896 }
897
898 #[tokio::test]
899 async fn test_dynamic_configuration() {
900 let (cfg, sender) = ConfigurationLoader::for_tests(
901 Some(serde_json::json!({
902 "foo": "bar",
903 "baz": 5,
904 "foobar": { "a": false, "b": "c" }
905 })),
906 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
907 true,
908 )
909 .await;
910 let sender = sender.expect("sender should exist");
911 sender
912 .send(ConfigUpdate::Snapshot(serde_json::json!({
913 "new": "from_snapshot",
914 })))
915 .await
916 .unwrap();
917
918 cfg.ready().await;
919
920 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
922
923 assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
925
926 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
927
928 sender
929 .send(ConfigUpdate::Partial {
930 key: "new_key".to_string(),
931 value: "from dynamic update".to_string().into(),
932 })
933 .await
934 .unwrap();
935
936 tokio::time::timeout(std::time::Duration::from_secs(2), async {
937 loop {
938 match rx.recv().await {
939 Ok(ev) if ev.key == "new_key" => break ev,
940 Err(e) => panic!("updates channel closed: {e}"),
941 Ok(_) => continue,
942 }
943 }
944 })
945 .await
946 .expect("timed out waiting for new_key update");
947
948 assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
949
950 sender
952 .send(ConfigUpdate::Partial {
953 key: "foobar.a".to_string(),
954 value: serde_json::json!(true),
955 })
956 .await
957 .unwrap();
958
959 tokio::time::timeout(std::time::Duration::from_secs(2), async {
960 loop {
961 match rx.recv().await {
962 Ok(ev) if ev.key == "foobar.a" => break ev,
963 Err(e) => panic!("updates channel closed: {e}"),
964 Ok(_) => continue,
965 }
966 }
967 })
968 .await
969 .expect("timed out waiting for foobar.a update");
970
971 assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
972 assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
973 }
974
975 #[tokio::test]
976 async fn test_environment_precedence_over_dynamic() {
977 let (cfg, sender) = ConfigurationLoader::for_tests(
978 Some(serde_json::json!({
979 "foo": "bar",
980 "baz": 5,
981 "foobar": { "a": false, "b": "c" }
982 })),
983 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
984 true,
985 )
986 .await;
987 let sender = sender.expect("sender should exist");
988
989 sender
990 .send(ConfigUpdate::Snapshot(serde_json::json!({
991 "env_var": "from_snapshot_env_var"
992 })))
993 .await
994 .unwrap();
995
996 cfg.ready().await;
997
998 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1000
1001 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1002
1003 sender
1005 .send(ConfigUpdate::Partial {
1006 key: "env_var".to_string(),
1007 value: serde_json::json!("from_partial"),
1008 })
1009 .await
1010 .unwrap();
1011
1012 sender
1014 .send(ConfigUpdate::Partial {
1015 key: "foobar.a".to_string(),
1016 value: serde_json::json!(false),
1017 })
1018 .await
1019 .unwrap();
1020
1021 sender
1023 .send(ConfigUpdate::Partial {
1024 key: "dummy".to_string(),
1025 value: serde_json::json!(1),
1026 })
1027 .await
1028 .unwrap();
1029
1030 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1031 loop {
1032 match rx.recv().await {
1033 Ok(ev) if ev.key == "dummy" => break,
1034 Err(e) => panic!("updates channel closed: {e}"),
1035 Ok(_) => continue,
1036 }
1037 }
1038 })
1039 .await
1040 .expect("timed out waiting for sync marker");
1041
1042 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1043 }
1044
1045 #[tokio::test]
1046 async fn test_dynamic_configuration_add_new_nested_key() {
1047 let (cfg, sender) = ConfigurationLoader::for_tests(
1048 Some(serde_json::json!({
1049 "foo": "bar",
1050 "baz": 5,
1051 "foobar": { "a": false, "b": "c" }
1052 })),
1053 None,
1054 true,
1055 )
1056 .await;
1057 let sender = sender.expect("sender should exist");
1058
1059 sender
1060 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1061 .await
1062 .unwrap();
1063 cfg.ready().await;
1064
1065 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1066
1067 sender
1068 .send(ConfigUpdate::Partial {
1069 key: "new_parent.new_child".to_string(),
1070 value: serde_json::json!(42),
1071 })
1072 .await
1073 .unwrap();
1074
1075 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1077 loop {
1078 match rx.recv().await {
1079 Ok(ev) if ev.key == "new_parent" => break ev,
1080 Err(e) => panic!("updates channel closed: {e}"),
1081 Ok(_) => continue,
1082 }
1083 }
1084 })
1085 .await
1086 .expect("timed out waiting for new_parent.new_child update");
1087
1088 assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1089 }
1090
1091 #[tokio::test]
1092 async fn test_underscore_fallback_on_get() {
1093 let (cfg, _) = ConfigurationLoader::for_tests(
1094 Some(serde_json::json!({})),
1095 Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1096 false,
1097 )
1098 .await;
1099 cfg.ready().await;
1100
1101 assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1102 }
1103
1104 #[tokio::test]
1105 async fn test_static_configuration_ready_and_subscribe() {
1106 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1107 assert!(maybe_sender.is_none());
1108
1109 tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1110 .await
1111 .expect("ready() should not block when dynamic is disabled");
1112
1113 assert!(cfg.subscribe_for_updates().is_none());
1114 }
1115
1116 #[tokio::test]
1117 async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1118 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1120 assert!(maybe_sender.is_some());
1121
1122 let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1124 assert!(res.is_err(), "ready() should time out without an initial snapshot");
1125 }
1126
1127 #[tokio::test]
1128 async fn test_flattened_keys_flat_and_nested() {
1129 let (cfg, _) = ConfigurationLoader::for_tests(
1130 Some(serde_json::json!({
1131 "top": "value",
1132 "nested": { "a": 1, "b": { "c": true } }
1133 })),
1134 None,
1135 false,
1136 )
1137 .await;
1138 cfg.ready().await;
1139
1140 let pairs = cfg.flattened_keys().unwrap();
1141 let map: std::collections::HashMap<&str, &serde_json::Value> =
1142 pairs.iter().map(|(k, v)| (k.as_str(), v)).collect();
1143
1144 assert_eq!(map.get("top"), Some(&&serde_json::json!("value")));
1145 assert_eq!(map.get("nested.a"), Some(&&serde_json::json!(1)));
1146 assert_eq!(map.get("nested.b.c"), Some(&&serde_json::json!(true)));
1147 assert!(!map.contains_key("nested"));
1148 assert!(!map.contains_key("nested.b"));
1149 }
1150
1151 #[tokio::test]
1152 async fn test_flattened_keys_arrays_are_leaves() {
1153 let (cfg, _) = ConfigurationLoader::for_tests(
1154 Some(serde_json::json!({
1155 "tags": ["a", "b"],
1156 "matrix": [[1, 2], [3, 4]]
1157 })),
1158 None,
1159 false,
1160 )
1161 .await;
1162 cfg.ready().await;
1163
1164 let pairs = cfg.flattened_keys().unwrap();
1165 let map: std::collections::HashMap<&str, &serde_json::Value> =
1166 pairs.iter().map(|(k, v)| (k.as_str(), v)).collect();
1167
1168 assert_eq!(map.get("tags"), Some(&&serde_json::json!(["a", "b"])));
1169 assert_eq!(map.get("matrix"), Some(&&serde_json::json!([[1, 2], [3, 4]])));
1170 }
1171
1172 #[tokio::test]
1173 async fn test_flattened_keys_null_values_absent() {
1174 let (cfg, _) = ConfigurationLoader::for_tests(
1175 Some(serde_json::json!({
1176 "present": "yes",
1177 "absent": null
1178 })),
1179 None,
1180 false,
1181 )
1182 .await;
1183 cfg.ready().await;
1184
1185 let pairs = cfg.flattened_keys().unwrap();
1186 let map: std::collections::HashMap<&str, &serde_json::Value> =
1187 pairs.iter().map(|(k, v)| (k.as_str(), v)).collect();
1188
1189 assert_eq!(map.get("present"), Some(&&serde_json::json!("yes")));
1190 assert!(!map.contains_key("absent"));
1192 }
1193}