1#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10 error::Kind,
11 providers::{Env, Serialized},
12 Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::{ResultExt as _, Snafu};
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod dynamic;
21mod provider;
22mod secrets;
23
24pub use self::dynamic::FieldUpdateWatcher;
25use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
26use self::provider::ResolvedProvider;
27
28#[derive(Clone)]
29struct ArcProvider(Arc<dyn Provider + Send + Sync>);
30
31impl Provider for ArcProvider {
32 fn metadata(&self) -> figment::Metadata {
33 self.0.metadata()
34 }
35
36 fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
37 self.0.data()
38 }
39}
40
41enum ProviderSource {
42 Static(ArcProvider),
43 Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
44}
45
46impl Clone for ProviderSource {
47 fn clone(&self) -> Self {
48 match self {
49 Self::Static(p) => Self::Static(p.clone()),
50 Self::Dynamic(_) => Self::Dynamic(None),
51 }
52 }
53}
54
55#[derive(Debug, Snafu)]
57#[snafu(context(suffix(false)))]
58pub enum ConfigurationError {
59 #[snafu(display("Environment variable prefix must not be empty."))]
61 EmptyPrefix,
62
63 #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
65 MissingField {
66 help_text: String,
71
72 field: Cow<'static, str>,
74 },
75
76 #[snafu(display(
78 "Expected value for field '{}' to be '{}', got '{}' instead.",
79 field,
80 expected_ty,
81 actual_ty
82 ))]
83 InvalidFieldType {
84 field: String,
88
89 expected_ty: String,
91
92 actual_ty: String,
94 },
95
96 #[snafu(display("Failed to query configuration."))]
98 Generic {
99 source: GenericError,
101 },
102
103 #[snafu(display("Failed to resolve secrets."))]
105 Secrets {
106 source: secrets::Error,
108 },
109}
110
111impl From<figment::Error> for ConfigurationError {
112 fn from(e: figment::Error) -> Self {
113 match e.kind {
114 Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
115 field: e.path.join("."),
116 expected_ty,
117 actual_ty: actual_ty.to_string(),
118 },
119 _ => Self::Generic { source: e.into() },
120 }
121 }
122}
123
124#[derive(Clone, Debug, Eq, Hash, PartialEq)]
125enum LookupSource {
126 Environment { prefix: String },
128}
129
130impl LookupSource {
131 fn transform_key(&self, key: &str) -> String {
132 match self {
133 LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
136 }
137 }
138}
139
140#[derive(Clone, Default)]
156pub struct ConfigurationLoader {
157 lookup_sources: HashSet<LookupSource>,
158 provider_sources: Vec<ProviderSource>,
159}
160
161impl ConfigurationLoader {
162 pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
168 where
169 P: AsRef<std::path::Path>,
170 {
171 let resolved_provider = ResolvedProvider::from_yaml(&path).context(Generic)?;
172 self.provider_sources
173 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
174 Ok(self)
175 }
176
177 pub fn try_from_yaml<P>(mut self, path: P) -> Self
181 where
182 P: AsRef<std::path::Path>,
183 {
184 match ResolvedProvider::from_yaml(&path) {
185 Ok(resolved_provider) => {
186 self.provider_sources
187 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
188 }
189 Err(e) => {
190 println!(
191 "Unable to read YAML configuration file '{}': {}. Ignoring.",
192 path.as_ref().to_string_lossy(),
193 e
194 );
195 }
196 }
197 self
198 }
199
200 pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
206 where
207 P: AsRef<std::path::Path>,
208 {
209 let resolved_provider = ResolvedProvider::from_json(&path).context(Generic)?;
210 self.provider_sources
211 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
212 Ok(self)
213 }
214
215 pub fn try_from_json<P>(mut self, path: P) -> Self
219 where
220 P: AsRef<std::path::Path>,
221 {
222 match ResolvedProvider::from_json(&path) {
223 Ok(resolved_provider) => {
224 self.provider_sources
225 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
226 }
227 Err(e) => {
228 println!(
229 "Unable to read JSON configuration file '{}': {}. Ignoring.",
230 path.as_ref().to_string_lossy(),
231 e
232 );
233 }
234 }
235 self
236 }
237
238 pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
249 if prefix.is_empty() {
250 return Err(ConfigurationError::EmptyPrefix);
251 }
252
253 let prefix = if prefix.ends_with('_') {
254 prefix.to_string()
255 } else {
256 format!("{}_", prefix)
257 };
258
259 let env = Env::prefixed(&prefix);
261 let values = env.data().unwrap();
262 if let Some(default_dict) = values.get(&figment::Profile::Default) {
263 self.provider_sources
264 .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
265 default_dict.clone(),
266 )))));
267 self.lookup_sources.insert(LookupSource::Environment { prefix });
268 }
269 Ok(self)
270 }
271
272 pub async fn with_default_secrets_resolution(mut self) -> Result<Self, ConfigurationError> {
337 let initial_figment = build_figment_from_sources(&self.provider_sources);
338
339 if !initial_figment.contains("secret_backend_command") {
341 debug!("No secrets backend configured; skipping secrets resolution.");
342 return Ok(self);
343 }
344
345 let resolver_config = initial_figment.extract::<secrets::resolver::ExternalProcessResolverConfiguration>()?;
346 let resolver = secrets::resolver::ExternalProcessResolver::from_configuration(resolver_config)
347 .await
348 .context(Secrets)?;
349
350 let provider = secrets::Provider::new(resolver, &initial_figment)
351 .await
352 .context(Secrets)?;
353
354 self.provider_sources
355 .push(ProviderSource::Static(ArcProvider(Arc::new(provider))));
356 Ok(self)
357 }
358
359 pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
363 self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
364 self
365 }
366
367 pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
373 where
374 T: Deserialize<'a>,
375 {
376 let figment = build_figment_from_sources(&self.provider_sources);
377 figment.extract().map_err(Into::into)
378 }
379
380 pub fn bootstrap_generic(&self) -> GenericConfiguration {
385 let figment = build_figment_from_sources(&self.provider_sources);
386
387 GenericConfiguration {
388 inner: Arc::new(Inner {
389 figment: RwLock::new(figment),
390 lookup_sources: self.lookup_sources.clone(),
391 event_sender: None,
392 ready_signal: Mutex::new(None),
393 }),
394 }
395 }
396
397 pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
399 let has_dynamic_provider = self
400 .provider_sources
401 .iter()
402 .any(|s| matches!(s, ProviderSource::Dynamic(_)));
403
404 if has_dynamic_provider {
405 let mut receiver_opt = None;
406 for source in self.provider_sources.iter_mut() {
407 if let ProviderSource::Dynamic(ref mut receiver) = source {
408 receiver_opt = receiver.take();
409 break;
410 }
411 }
412 let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
413
414 let figment = build_figment_from_sources(&self.provider_sources);
416
417 let (event_sender, _) = broadcast::channel(100);
418 let (ready_sender, ready_receiver) = oneshot::channel();
419
420 let generic_config = GenericConfiguration {
421 inner: Arc::new(Inner {
422 figment: RwLock::new(figment),
423 lookup_sources: self.lookup_sources,
424 event_sender: Some(event_sender.clone()),
425 ready_signal: Mutex::new(Some(ready_receiver)),
426 }),
427 };
428
429 tokio::spawn(run_dynamic_config_updater(
431 generic_config.inner.clone(),
432 receiver,
433 self.provider_sources,
434 event_sender,
435 ready_sender,
436 ));
437
438 Ok(generic_config)
439 } else {
440 let figment = build_figment_from_sources(&self.provider_sources);
442
443 Ok(GenericConfiguration {
444 inner: Arc::new(Inner {
445 figment: RwLock::new(figment),
446 lookup_sources: self.lookup_sources,
447 event_sender: None,
448 ready_signal: Mutex::new(None),
449 }),
450 })
451 }
452 }
453
454 pub async fn for_tests(
465 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
466 enable_dynamic_configuration: bool,
467 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
468 let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
469 let path = &json_file.path();
470 let json_to_write = file_values.unwrap_or(serde_json::json!({}));
471 serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
472
473 let mut loader = ConfigurationLoader::default().try_from_json(path);
474 let mut maybe_sender = None;
475 if enable_dynamic_configuration {
476 let (sender, receiver) = tokio::sync::mpsc::channel(1);
477 loader = loader.with_dynamic_configuration(receiver);
478 maybe_sender = Some(sender);
479 }
480
481 static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
482
483 let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
484
485 if let Some(pairs) = env_vars.as_ref() {
486 for (k, v) in pairs.iter() {
487 std::env::set_var(format!("TEST_{}", k), v);
488 }
489 }
490
491 let loader = loader
493 .from_environment("TEST")
494 .expect("should not fail to add environment provider");
495
496 if let Some(pairs) = env_vars.as_ref() {
498 for (k, _) in pairs.iter() {
499 std::env::remove_var(k);
500 }
501 }
502
503 drop(guard);
504
505 let cfg = loader
506 .into_generic()
507 .await
508 .expect("should not fail to build generic configuration");
509
510 (cfg, maybe_sender)
511 }
512}
513
514fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
515 sources.iter().fold(Figment::new(), |figment, source| match source {
516 ProviderSource::Static(p) => figment.admerge(p.clone()),
517 ProviderSource::Dynamic(_) => figment,
519 })
520}
521
522fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
526 if !root.is_object() {
527 *root = serde_json::Value::Object(serde_json::Map::new());
528 }
529
530 let mut current = root;
531 let mut segments = key.split('.').peekable();
533
534 while let Some(seg) = segments.next() {
535 let is_leaf = segments.peek().is_none();
536
537 if !current.is_object() {
539 *current = serde_json::Value::Object(serde_json::Map::new());
540 }
541 let node = current.as_object_mut().expect("current node should be an object");
542
543 if is_leaf {
544 node.insert(seg.to_string(), value);
545 break;
546 } else {
547 let should_create_node = match node.get(seg) {
549 Some(v) => !v.is_object(),
550 None => true,
551 };
552 if should_create_node {
554 node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
555 }
556
557 current = node.get_mut(seg).expect("should not fail to get nested object");
559 }
560 }
561}
562
563async fn run_dynamic_config_updater(
564 inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
565 sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
566) {
567 let initial_update = match receiver.recv().await {
569 Some(update) => update,
570 None => {
571 debug!("Dynamic configuration channel closed before initial snapshot.");
573 return;
574 }
575 };
576
577 let mut dynamic_state = match initial_update {
578 ConfigUpdate::Snapshot(state) => state,
579 ConfigUpdate::Partial { .. } => {
580 error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
582 serde_json::Value::Null
583 }
584 };
585
586 let new_figment = provider_sources
588 .iter()
589 .fold(Figment::new(), |figment, source| match source {
590 ProviderSource::Static(p) => figment.admerge(p.clone()),
591 ProviderSource::Dynamic(_) => {
592 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
593 }
594 });
595
596 {
598 let mut figment_guard = inner.figment.write().unwrap();
599 *figment_guard = new_figment.clone();
600 }
601
602 if ready_sender.send(()).is_err() {
604 debug!("Configuration readiness receiver dropped. Updater task shutting down.");
605 return;
606 }
607
608 let mut current_config: figment::value::Value = new_figment.extract().unwrap();
610
611 loop {
613 let update = match receiver.recv().await {
614 Some(update) => update,
615 None => {
616 debug!("Dynamic configuration update channel closed. Updater task shutting down.");
618 return;
619 }
620 };
621
622 match update {
624 ConfigUpdate::Snapshot(new_state) => {
625 dynamic_state = new_state;
626 }
627 ConfigUpdate::Partial { key, value } => {
628 if dynamic_state.is_null() {
629 dynamic_state = serde_json::Value::Object(serde_json::Map::new());
630 }
631 if dynamic_state.is_object() {
632 upsert(&mut dynamic_state, &key, value);
633 } else {
634 error!(
635 "Received partial update but current dynamic state is not an object. This should not happen."
636 );
637 }
638 }
639 }
640
641 let new_figment = provider_sources
643 .iter()
644 .fold(Figment::new(), |figment, source| match source {
645 ProviderSource::Static(p) => figment.admerge(p.clone()),
646 ProviderSource::Dynamic(_) => {
647 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
648 }
649 });
650
651 let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
652
653 if current_config != new_config {
654 for change in dynamic::diff_config(¤t_config, &new_config) {
655 let _ = sender.send(change);
659 }
660
661 let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
662 error!("Failed to acquire write lock for dynamic configuration: {}", e);
663 e.into_inner()
664 });
665 *figment_guard = new_figment;
666
667 current_config = new_config;
669 }
670 }
671}
672
673#[derive(Debug)]
674struct Inner {
675 figment: RwLock<Figment>,
676 lookup_sources: HashSet<LookupSource>,
677 event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
678 ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
679}
680
681#[derive(Clone, Debug)]
703pub struct GenericConfiguration {
704 inner: Arc<Inner>,
705}
706
707impl GenericConfiguration {
708 pub async fn ready(&self) {
715 let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
718 if let Some(ready_rx) = maybe_ready_rx.take() {
719 if ready_rx.await.is_err() {
721 error!("Failed to receive configuration readiness signal; updater task may have panicked.");
722 }
723 }
724 }
725
726 fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
727 where
728 T: Deserialize<'a>,
729 {
730 let figment_guard = self.inner.figment.read().unwrap();
731 match figment_guard.extract_inner(key) {
732 Ok(value) => Ok(value),
733 Err(e) => {
734 if matches!(e.kind, figment::error::Kind::MissingField(_)) {
735 let fallback_key = key.replace('.', "_");
740 figment_guard
741 .extract_inner(&fallback_key)
742 .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
743 } else {
744 Err(e.into())
745 }
746 }
747 }
748 }
749
750 pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
759 where
760 T: Deserialize<'a>,
761 {
762 self.get(key)
763 }
764
765 pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
772 where
773 T: Default + Deserialize<'a>,
774 {
775 self.get(key).unwrap_or_default()
776 }
777
778 pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
789 where
790 T: Deserialize<'a>,
791 {
792 match self.get(key) {
793 Ok(value) => Ok(Some(value)),
794 Err(ConfigurationError::MissingField { .. }) => Ok(None),
795 Err(e) => Err(e),
796 }
797 }
798
799 pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
805 where
806 T: Deserialize<'a>,
807 {
808 self.inner
809 .figment
810 .read()
811 .unwrap()
812 .extract()
813 .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
814 }
815
816 pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
818 self.inner.event_sender.as_ref().map(|s| s.subscribe())
819 }
820
821 pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
826 FieldUpdateWatcher {
827 key: key.to_string(),
828 rx: self.subscribe_for_updates(),
829 }
830 }
831}
832
833fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
834 match e.kind {
835 Kind::MissingField(field) => {
836 let mut valid_keys = lookup_sources
837 .iter()
838 .map(|source| source.transform_key(&field))
839 .collect::<Vec<_>>();
840
841 valid_keys.insert(0, field.to_string());
843
844 let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
845
846 ConfigurationError::MissingField { help_text, field }
847 }
848 Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
849 field: e.path.join("."),
850 expected_ty,
851 actual_ty: actual_ty.to_string(),
852 },
853 _ => ConfigurationError::Generic { source: e.into() },
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use super::*;
860
861 #[tokio::test]
862 async fn test_static_configuration() {
863 let (cfg, _) = ConfigurationLoader::for_tests(
864 Some(serde_json::json!({
865 "foo": "bar",
866 "baz": 5,
867 "foobar": { "a": false, "b": "c" }
868 })),
869 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
870 false,
871 )
872 .await;
873 cfg.ready().await;
874
875 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
876 assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
877 assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
878 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
879 assert!(matches!(
880 cfg.get::<String>("nonexistentKey"),
881 Err(ConfigurationError::MissingField { .. })
882 ));
883 }
884
885 #[tokio::test]
886 async fn test_dynamic_configuration() {
887 let (cfg, sender) = ConfigurationLoader::for_tests(
888 Some(serde_json::json!({
889 "foo": "bar",
890 "baz": 5,
891 "foobar": { "a": false, "b": "c" }
892 })),
893 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
894 true,
895 )
896 .await;
897 let sender = sender.expect("sender should exist");
898 sender
899 .send(ConfigUpdate::Snapshot(serde_json::json!({
900 "new": "from_snapshot",
901 })))
902 .await
903 .unwrap();
904
905 cfg.ready().await;
906
907 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
909
910 assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
912
913 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
914
915 sender
916 .send(ConfigUpdate::Partial {
917 key: "new_key".to_string(),
918 value: "from dynamic update".to_string().into(),
919 })
920 .await
921 .unwrap();
922
923 tokio::time::timeout(std::time::Duration::from_secs(2), async {
924 loop {
925 match rx.recv().await {
926 Ok(ev) if ev.key == "new_key" => break ev,
927 Err(e) => panic!("updates channel closed: {e}"),
928 Ok(_) => continue,
929 }
930 }
931 })
932 .await
933 .expect("timed out waiting for new_key update");
934
935 assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
936
937 sender
939 .send(ConfigUpdate::Partial {
940 key: "foobar.a".to_string(),
941 value: serde_json::json!(true),
942 })
943 .await
944 .unwrap();
945
946 tokio::time::timeout(std::time::Duration::from_secs(2), async {
947 loop {
948 match rx.recv().await {
949 Ok(ev) if ev.key == "foobar.a" => break ev,
950 Err(e) => panic!("updates channel closed: {e}"),
951 Ok(_) => continue,
952 }
953 }
954 })
955 .await
956 .expect("timed out waiting for foobar.a update");
957
958 assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
959 assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
960 }
961
962 #[tokio::test]
963 async fn test_environment_precedence_over_dynamic() {
964 let (cfg, sender) = ConfigurationLoader::for_tests(
965 Some(serde_json::json!({
966 "foo": "bar",
967 "baz": 5,
968 "foobar": { "a": false, "b": "c" }
969 })),
970 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
971 true,
972 )
973 .await;
974 let sender = sender.expect("sender should exist");
975
976 sender
977 .send(ConfigUpdate::Snapshot(serde_json::json!({
978 "env_var": "from_snapshot_env_var"
979 })))
980 .await
981 .unwrap();
982
983 cfg.ready().await;
984
985 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
987
988 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
989
990 sender
992 .send(ConfigUpdate::Partial {
993 key: "env_var".to_string(),
994 value: serde_json::json!("from_partial"),
995 })
996 .await
997 .unwrap();
998
999 sender
1001 .send(ConfigUpdate::Partial {
1002 key: "foobar.a".to_string(),
1003 value: serde_json::json!(false),
1004 })
1005 .await
1006 .unwrap();
1007
1008 sender
1010 .send(ConfigUpdate::Partial {
1011 key: "dummy".to_string(),
1012 value: serde_json::json!(1),
1013 })
1014 .await
1015 .unwrap();
1016
1017 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1018 loop {
1019 match rx.recv().await {
1020 Ok(ev) if ev.key == "dummy" => break,
1021 Err(e) => panic!("updates channel closed: {e}"),
1022 Ok(_) => continue,
1023 }
1024 }
1025 })
1026 .await
1027 .expect("timed out waiting for sync marker");
1028
1029 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1030 }
1031
1032 #[tokio::test]
1033 async fn test_dynamic_configuration_add_new_nested_key() {
1034 let (cfg, sender) = ConfigurationLoader::for_tests(
1035 Some(serde_json::json!({
1036 "foo": "bar",
1037 "baz": 5,
1038 "foobar": { "a": false, "b": "c" }
1039 })),
1040 None,
1041 true,
1042 )
1043 .await;
1044 let sender = sender.expect("sender should exist");
1045
1046 sender
1047 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1048 .await
1049 .unwrap();
1050 cfg.ready().await;
1051
1052 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1053
1054 sender
1055 .send(ConfigUpdate::Partial {
1056 key: "new_parent.new_child".to_string(),
1057 value: serde_json::json!(42),
1058 })
1059 .await
1060 .unwrap();
1061
1062 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1064 loop {
1065 match rx.recv().await {
1066 Ok(ev) if ev.key == "new_parent" => break ev,
1067 Err(e) => panic!("updates channel closed: {e}"),
1068 Ok(_) => continue,
1069 }
1070 }
1071 })
1072 .await
1073 .expect("timed out waiting for new_parent.new_child update");
1074
1075 assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1076 }
1077
1078 #[tokio::test]
1079 async fn test_underscore_fallback_on_get() {
1080 let (cfg, _) = ConfigurationLoader::for_tests(
1081 Some(serde_json::json!({})),
1082 Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1083 false,
1084 )
1085 .await;
1086 cfg.ready().await;
1087
1088 assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1089 }
1090
1091 #[tokio::test]
1092 async fn test_static_configuration_ready_and_subscribe() {
1093 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1094 assert!(maybe_sender.is_none());
1095
1096 tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1097 .await
1098 .expect("ready() should not block when dynamic is disabled");
1099
1100 assert!(cfg.subscribe_for_updates().is_none());
1101 }
1102
1103 #[tokio::test]
1104 async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1105 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1107 assert!(maybe_sender.is_some());
1108
1109 let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1111 assert!(res.is_err(), "ready() should time out without an initial snapshot");
1112 }
1113}