1#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10 error::Kind,
11 providers::{Env, Serialized},
12 Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::{ResultExt as _, Snafu};
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod dynamic;
21mod provider;
22mod secrets;
23
24pub use self::dynamic::FieldUpdateWatcher;
25use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
26use self::provider::ResolvedProvider;
27
28#[derive(Clone)]
29struct ArcProvider(Arc<dyn Provider + Send + Sync>);
30
31impl Provider for ArcProvider {
32 fn metadata(&self) -> figment::Metadata {
33 self.0.metadata()
34 }
35
36 fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
37 self.0.data()
38 }
39}
40
41enum ProviderSource {
42 Static(ArcProvider),
43 Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
44}
45
46impl Clone for ProviderSource {
47 fn clone(&self) -> Self {
48 match self {
49 Self::Static(p) => Self::Static(p.clone()),
50 Self::Dynamic(_) => Self::Dynamic(None),
51 }
52 }
53}
54
55#[derive(Debug, Snafu)]
57#[snafu(context(suffix(false)))]
58pub enum ConfigurationError {
59 #[snafu(display("Environment variable prefix must not be empty."))]
61 EmptyPrefix,
62
63 #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
65 MissingField {
66 help_text: String,
71
72 field: Cow<'static, str>,
74 },
75
76 #[snafu(display(
78 "Expected value for field '{}' to be '{}', got '{}' instead.",
79 field,
80 expected_ty,
81 actual_ty
82 ))]
83 InvalidFieldType {
84 field: String,
88
89 expected_ty: String,
91
92 actual_ty: String,
94 },
95
96 #[snafu(transparent)]
98 Generic {
99 source: GenericError,
101 },
102
103 #[snafu(display("Failed to resolve secrets."))]
105 Secrets {
106 source: secrets::Error,
108 },
109}
110
111impl From<figment::Error> for ConfigurationError {
112 fn from(e: figment::Error) -> Self {
113 match e.kind {
114 Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
115 field: e.path.join("."),
116 expected_ty,
117 actual_ty: actual_ty.to_string(),
118 },
119 _ => Self::Generic { source: e.into() },
120 }
121 }
122}
123
124#[derive(Clone, Debug, Eq, Hash, PartialEq)]
125enum LookupSource {
126 Environment { prefix: String },
128}
129
130impl LookupSource {
131 fn transform_key(&self, key: &str) -> String {
132 match self {
133 LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
136 }
137 }
138}
139
140#[derive(Clone, Default)]
156pub struct ConfigurationLoader {
157 lookup_sources: HashSet<LookupSource>,
158 provider_sources: Vec<ProviderSource>,
159}
160
161impl ConfigurationLoader {
162 pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
168 where
169 P: AsRef<std::path::Path>,
170 {
171 let resolved_provider = ResolvedProvider::from_yaml(&path)?;
172 self.provider_sources
173 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
174 Ok(self)
175 }
176
177 pub fn try_from_yaml<P>(mut self, path: P) -> Self
181 where
182 P: AsRef<std::path::Path>,
183 {
184 match ResolvedProvider::from_yaml(&path) {
185 Ok(resolved_provider) => {
186 self.provider_sources
187 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
188 }
189 Err(e) => {
190 println!(
191 "Unable to read YAML configuration file '{}': {}. Ignoring.",
192 path.as_ref().to_string_lossy(),
193 e
194 );
195 }
196 }
197 self
198 }
199
200 pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
206 where
207 P: AsRef<std::path::Path>,
208 {
209 let resolved_provider = ResolvedProvider::from_json(&path)?;
210 self.provider_sources
211 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
212 Ok(self)
213 }
214
215 pub fn try_from_json<P>(mut self, path: P) -> Self
219 where
220 P: AsRef<std::path::Path>,
221 {
222 match ResolvedProvider::from_json(&path) {
223 Ok(resolved_provider) => {
224 self.provider_sources
225 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
226 }
227 Err(e) => {
228 println!(
229 "Unable to read JSON configuration file '{}': {}. Ignoring.",
230 path.as_ref().to_string_lossy(),
231 e
232 );
233 }
234 }
235 self
236 }
237
238 pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
249 if prefix.is_empty() {
250 return Err(ConfigurationError::EmptyPrefix);
251 }
252
253 let prefix = if prefix.ends_with('_') {
254 prefix.to_string()
255 } else {
256 format!("{}_", prefix)
257 };
258
259 let env = Env::prefixed(&prefix).split("__");
261 let values = env.data().unwrap();
262 if let Some(default_dict) = values.get(&figment::Profile::Default) {
263 self.provider_sources
264 .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
265 default_dict.clone(),
266 )))));
267 self.lookup_sources.insert(LookupSource::Environment { prefix });
268 }
269 Ok(self)
270 }
271
272 pub async fn with_default_secrets_resolution(mut self) -> Result<Self, ConfigurationError> {
337 let initial_figment = build_figment_from_sources(&self.provider_sources);
338
339 if !initial_figment.contains("secret_backend_command") {
341 debug!("No secrets backend configured; skipping secrets resolution.");
342 return Ok(self);
343 }
344
345 let resolver_config = initial_figment.extract::<secrets::resolver::ExternalProcessResolverConfiguration>()?;
346 let resolver = secrets::resolver::ExternalProcessResolver::from_configuration(resolver_config)
347 .await
348 .context(Secrets)?;
349
350 let provider = secrets::Provider::new(resolver, &initial_figment)
351 .await
352 .context(Secrets)?;
353
354 self.provider_sources
355 .push(ProviderSource::Static(ArcProvider(Arc::new(provider))));
356 Ok(self)
357 }
358
359 pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
363 self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
364 self
365 }
366
367 pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
373 where
374 T: Deserialize<'a>,
375 {
376 let figment = build_figment_from_sources(&self.provider_sources);
377 figment.extract().map_err(Into::into)
378 }
379
380 pub fn bootstrap_generic(&self) -> GenericConfiguration {
385 let figment = build_figment_from_sources(&self.provider_sources);
386
387 GenericConfiguration {
388 inner: Arc::new(Inner {
389 figment: RwLock::new(figment),
390 lookup_sources: self.lookup_sources.clone(),
391 event_sender: None,
392 ready_signal: Mutex::new(None),
393 }),
394 }
395 }
396
397 pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
399 let has_dynamic_provider = self
400 .provider_sources
401 .iter()
402 .any(|s| matches!(s, ProviderSource::Dynamic(_)));
403
404 if has_dynamic_provider {
405 let mut receiver_opt = None;
406 for source in self.provider_sources.iter_mut() {
407 if let ProviderSource::Dynamic(ref mut receiver) = source {
408 receiver_opt = receiver.take();
409 break;
410 }
411 }
412 let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
413
414 let figment = build_figment_from_sources(&self.provider_sources);
416
417 let (event_sender, _) = broadcast::channel(100);
418 let (ready_sender, ready_receiver) = oneshot::channel();
419
420 let generic_config = GenericConfiguration {
421 inner: Arc::new(Inner {
422 figment: RwLock::new(figment),
423 lookup_sources: self.lookup_sources,
424 event_sender: Some(event_sender.clone()),
425 ready_signal: Mutex::new(Some(ready_receiver)),
426 }),
427 };
428
429 tokio::spawn(run_dynamic_config_updater(
431 generic_config.inner.clone(),
432 receiver,
433 self.provider_sources,
434 event_sender,
435 ready_sender,
436 ));
437
438 Ok(generic_config)
439 } else {
440 let figment = build_figment_from_sources(&self.provider_sources);
442
443 Ok(GenericConfiguration {
444 inner: Arc::new(Inner {
445 figment: RwLock::new(figment),
446 lookup_sources: self.lookup_sources,
447 event_sender: None,
448 ready_signal: Mutex::new(None),
449 }),
450 })
451 }
452 }
453
454 pub async fn for_tests(
465 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
466 enable_dynamic_configuration: bool,
467 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
468 let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
469 let path = &json_file.path();
470 let json_to_write = file_values.unwrap_or(serde_json::json!({}));
471 serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
472
473 let mut loader = ConfigurationLoader::default().try_from_json(path);
474 let mut maybe_sender = None;
475 if enable_dynamic_configuration {
476 let (sender, receiver) = tokio::sync::mpsc::channel(1);
477 loader = loader.with_dynamic_configuration(receiver);
478 maybe_sender = Some(sender);
479 }
480
481 static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
482
483 let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
484
485 if let Some(pairs) = env_vars.as_ref() {
486 for (k, v) in pairs.iter() {
487 std::env::set_var(format!("TEST_{}", k), v);
488 }
489 }
490
491 let loader = loader
493 .from_environment("TEST")
494 .expect("should not fail to add environment provider");
495
496 if let Some(pairs) = env_vars.as_ref() {
498 for (k, _) in pairs.iter() {
499 std::env::remove_var(k);
500 }
501 }
502
503 drop(guard);
504
505 let cfg = loader
506 .into_generic()
507 .await
508 .expect("should not fail to build generic configuration");
509
510 (cfg, maybe_sender)
511 }
512}
513
514fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
515 sources.iter().fold(Figment::new(), |figment, source| match source {
516 ProviderSource::Static(p) => figment.admerge(p.clone()),
517 ProviderSource::Dynamic(_) => figment,
519 })
520}
521
522fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
526 if !root.is_object() {
527 *root = serde_json::Value::Object(serde_json::Map::new());
528 }
529
530 let mut current = root;
531 let mut segments = key.split('.').peekable();
533
534 while let Some(seg) = segments.next() {
535 let is_leaf = segments.peek().is_none();
536
537 if !current.is_object() {
539 *current = serde_json::Value::Object(serde_json::Map::new());
540 }
541 let node = current.as_object_mut().expect("current node should be an object");
542
543 if is_leaf {
544 node.insert(seg.to_string(), value);
545 break;
546 } else {
547 let should_create_node = match node.get(seg) {
549 Some(v) => !v.is_object(),
550 None => true,
551 };
552 if should_create_node {
554 node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
555 }
556
557 current = node.get_mut(seg).expect("should not fail to get nested object");
559 }
560 }
561}
562
563async fn run_dynamic_config_updater(
564 inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
565 sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
566) {
567 let initial_update = match receiver.recv().await {
569 Some(update) => update,
570 None => {
571 debug!("Dynamic configuration channel closed before initial snapshot.");
573 return;
574 }
575 };
576
577 let mut dynamic_state = match initial_update {
578 ConfigUpdate::Snapshot(state) => state,
579 ConfigUpdate::Partial { .. } => {
580 error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
582 serde_json::Value::Null
583 }
584 };
585
586 let new_figment = provider_sources
588 .iter()
589 .fold(Figment::new(), |figment, source| match source {
590 ProviderSource::Static(p) => figment.admerge(p.clone()),
591 ProviderSource::Dynamic(_) => {
592 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
593 }
594 });
595
596 {
598 let mut figment_guard = inner.figment.write().unwrap();
599 *figment_guard = new_figment.clone();
600 }
601
602 if ready_sender.send(()).is_err() {
604 debug!("Configuration readiness receiver dropped. Updater task shutting down.");
605 return;
606 }
607
608 let mut current_config: figment::value::Value = new_figment.extract().unwrap();
610
611 loop {
613 let update = match receiver.recv().await {
614 Some(update) => update,
615 None => {
616 debug!("Dynamic configuration update channel closed. Updater task shutting down.");
618 return;
619 }
620 };
621
622 match update {
624 ConfigUpdate::Snapshot(new_state) => {
625 debug!("Received configuration snapshot update.");
626 dynamic_state = new_state;
627 }
628 ConfigUpdate::Partial { key, value } => {
629 debug!(%key, "Received partial configuration update.");
630 if dynamic_state.is_null() {
631 dynamic_state = serde_json::Value::Object(serde_json::Map::new());
632 }
633 if dynamic_state.is_object() {
634 upsert(&mut dynamic_state, &key, value);
635 } else {
636 error!(
637 "Received partial update but current dynamic state is not an object. This should not happen."
638 );
639 }
640 }
641 }
642
643 let new_figment = provider_sources
645 .iter()
646 .fold(Figment::new(), |figment, source| match source {
647 ProviderSource::Static(p) => figment.admerge(p.clone()),
648 ProviderSource::Dynamic(_) => {
649 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
650 }
651 });
652
653 let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
654
655 if current_config != new_config {
656 for change in dynamic::diff_config(¤t_config, &new_config) {
657 let _ = sender.send(change);
661 }
662
663 let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
664 error!("Failed to acquire write lock for dynamic configuration: {}", e);
665 e.into_inner()
666 });
667 *figment_guard = new_figment;
668
669 current_config = new_config;
671 }
672 }
673}
674
675#[derive(Debug)]
676struct Inner {
677 figment: RwLock<Figment>,
678 lookup_sources: HashSet<LookupSource>,
679 event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
680 ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
681}
682
683#[derive(Clone, Debug)]
705pub struct GenericConfiguration {
706 inner: Arc<Inner>,
707}
708
709impl GenericConfiguration {
710 pub async fn ready(&self) {
717 let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
720 if let Some(ready_rx) = maybe_ready_rx.take() {
721 if ready_rx.await.is_err() {
723 error!("Failed to receive configuration readiness signal; updater task may have panicked.");
724 }
725 }
726 }
727
728 fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
729 where
730 T: Deserialize<'a>,
731 {
732 let figment_guard = self.inner.figment.read().unwrap();
733 match figment_guard.extract_inner(key) {
734 Ok(value) => Ok(value),
735 Err(e) => {
736 if matches!(e.kind, figment::error::Kind::MissingField(_)) {
737 let fallback_key = key.replace('.', "_");
742 figment_guard
743 .extract_inner(&fallback_key)
744 .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
745 } else {
746 Err(e.into())
747 }
748 }
749 }
750 }
751
752 pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
761 where
762 T: Deserialize<'a>,
763 {
764 self.get(key)
765 }
766
767 pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
774 where
775 T: Default + Deserialize<'a>,
776 {
777 self.get(key).unwrap_or_default()
778 }
779
780 pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
791 where
792 T: Deserialize<'a>,
793 {
794 match self.get(key) {
795 Ok(value) => Ok(Some(value)),
796 Err(ConfigurationError::MissingField { .. }) => Ok(None),
797 Err(e) => Err(e),
798 }
799 }
800
801 pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
807 where
808 T: Deserialize<'a>,
809 {
810 self.inner
811 .figment
812 .read()
813 .unwrap()
814 .extract()
815 .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
816 }
817
818 pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
820 self.inner.event_sender.as_ref().map(|s| s.subscribe())
821 }
822
823 pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
828 FieldUpdateWatcher {
829 key: key.to_string(),
830 rx: self.subscribe_for_updates(),
831 }
832 }
833}
834
835fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
836 match e.kind {
837 Kind::MissingField(field) => {
838 let mut valid_keys = lookup_sources
839 .iter()
840 .map(|source| source.transform_key(&field))
841 .collect::<Vec<_>>();
842
843 valid_keys.insert(0, field.to_string());
845
846 let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
847
848 ConfigurationError::MissingField { help_text, field }
849 }
850 Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
851 field: e.path.join("."),
852 expected_ty,
853 actual_ty: actual_ty.to_string(),
854 },
855 _ => ConfigurationError::Generic { source: e.into() },
856 }
857}
858
859#[cfg(test)]
860mod tests {
861 use super::*;
862
863 #[tokio::test]
864 async fn test_static_configuration() {
865 let (cfg, _) = ConfigurationLoader::for_tests(
866 Some(serde_json::json!({
867 "foo": "bar",
868 "baz": 5,
869 "foobar": { "a": false, "b": "c" }
870 })),
871 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
872 false,
873 )
874 .await;
875 cfg.ready().await;
876
877 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
878 assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
879 assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
880 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
881 assert!(matches!(
882 cfg.get::<String>("nonexistentKey"),
883 Err(ConfigurationError::MissingField { .. })
884 ));
885 }
886
887 #[tokio::test]
888 async fn test_dynamic_configuration() {
889 let (cfg, sender) = ConfigurationLoader::for_tests(
890 Some(serde_json::json!({
891 "foo": "bar",
892 "baz": 5,
893 "foobar": { "a": false, "b": "c" }
894 })),
895 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
896 true,
897 )
898 .await;
899 let sender = sender.expect("sender should exist");
900 sender
901 .send(ConfigUpdate::Snapshot(serde_json::json!({
902 "new": "from_snapshot",
903 })))
904 .await
905 .unwrap();
906
907 cfg.ready().await;
908
909 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
911
912 assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
914
915 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
916
917 sender
918 .send(ConfigUpdate::Partial {
919 key: "new_key".to_string(),
920 value: "from dynamic update".to_string().into(),
921 })
922 .await
923 .unwrap();
924
925 tokio::time::timeout(std::time::Duration::from_secs(2), async {
926 loop {
927 match rx.recv().await {
928 Ok(ev) if ev.key == "new_key" => break ev,
929 Err(e) => panic!("updates channel closed: {e}"),
930 Ok(_) => continue,
931 }
932 }
933 })
934 .await
935 .expect("timed out waiting for new_key update");
936
937 assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
938
939 sender
941 .send(ConfigUpdate::Partial {
942 key: "foobar.a".to_string(),
943 value: serde_json::json!(true),
944 })
945 .await
946 .unwrap();
947
948 tokio::time::timeout(std::time::Duration::from_secs(2), async {
949 loop {
950 match rx.recv().await {
951 Ok(ev) if ev.key == "foobar.a" => break ev,
952 Err(e) => panic!("updates channel closed: {e}"),
953 Ok(_) => continue,
954 }
955 }
956 })
957 .await
958 .expect("timed out waiting for foobar.a update");
959
960 assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
961 assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
962 }
963
964 #[tokio::test]
965 async fn test_environment_precedence_over_dynamic() {
966 let (cfg, sender) = ConfigurationLoader::for_tests(
967 Some(serde_json::json!({
968 "foo": "bar",
969 "baz": 5,
970 "foobar": { "a": false, "b": "c" }
971 })),
972 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
973 true,
974 )
975 .await;
976 let sender = sender.expect("sender should exist");
977
978 sender
979 .send(ConfigUpdate::Snapshot(serde_json::json!({
980 "env_var": "from_snapshot_env_var"
981 })))
982 .await
983 .unwrap();
984
985 cfg.ready().await;
986
987 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
989
990 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
991
992 sender
994 .send(ConfigUpdate::Partial {
995 key: "env_var".to_string(),
996 value: serde_json::json!("from_partial"),
997 })
998 .await
999 .unwrap();
1000
1001 sender
1003 .send(ConfigUpdate::Partial {
1004 key: "foobar.a".to_string(),
1005 value: serde_json::json!(false),
1006 })
1007 .await
1008 .unwrap();
1009
1010 sender
1012 .send(ConfigUpdate::Partial {
1013 key: "dummy".to_string(),
1014 value: serde_json::json!(1),
1015 })
1016 .await
1017 .unwrap();
1018
1019 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1020 loop {
1021 match rx.recv().await {
1022 Ok(ev) if ev.key == "dummy" => break,
1023 Err(e) => panic!("updates channel closed: {e}"),
1024 Ok(_) => continue,
1025 }
1026 }
1027 })
1028 .await
1029 .expect("timed out waiting for sync marker");
1030
1031 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1032 }
1033
1034 #[tokio::test]
1035 async fn test_dynamic_configuration_add_new_nested_key() {
1036 let (cfg, sender) = ConfigurationLoader::for_tests(
1037 Some(serde_json::json!({
1038 "foo": "bar",
1039 "baz": 5,
1040 "foobar": { "a": false, "b": "c" }
1041 })),
1042 None,
1043 true,
1044 )
1045 .await;
1046 let sender = sender.expect("sender should exist");
1047
1048 sender
1049 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1050 .await
1051 .unwrap();
1052 cfg.ready().await;
1053
1054 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1055
1056 sender
1057 .send(ConfigUpdate::Partial {
1058 key: "new_parent.new_child".to_string(),
1059 value: serde_json::json!(42),
1060 })
1061 .await
1062 .unwrap();
1063
1064 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1066 loop {
1067 match rx.recv().await {
1068 Ok(ev) if ev.key == "new_parent" => break ev,
1069 Err(e) => panic!("updates channel closed: {e}"),
1070 Ok(_) => continue,
1071 }
1072 }
1073 })
1074 .await
1075 .expect("timed out waiting for new_parent.new_child update");
1076
1077 assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1078 }
1079
1080 #[tokio::test]
1081 async fn test_underscore_fallback_on_get() {
1082 let (cfg, _) = ConfigurationLoader::for_tests(
1083 Some(serde_json::json!({})),
1084 Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1085 false,
1086 )
1087 .await;
1088 cfg.ready().await;
1089
1090 assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1091 }
1092
1093 #[tokio::test]
1094 async fn test_static_configuration_ready_and_subscribe() {
1095 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1096 assert!(maybe_sender.is_none());
1097
1098 tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1099 .await
1100 .expect("ready() should not block when dynamic is disabled");
1101
1102 assert!(cfg.subscribe_for_updates().is_none());
1103 }
1104
1105 #[tokio::test]
1106 async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1107 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1109 assert!(maybe_sender.is_some());
1110
1111 let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1113 assert!(res.is_err(), "ready() should time out without an initial snapshot");
1114 }
1115}