1#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10 error::Kind,
11 providers::{Env, Serialized},
12 Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::{ResultExt as _, Snafu};
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod dynamic;
21mod provider;
22mod secrets;
23
24pub use self::dynamic::FieldUpdateWatcher;
25use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
26use self::provider::ResolvedProvider;
27
28#[derive(Clone)]
29struct ArcProvider(Arc<dyn Provider + Send + Sync>);
30
31impl Provider for ArcProvider {
32 fn metadata(&self) -> figment::Metadata {
33 self.0.metadata()
34 }
35
36 fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
37 self.0.data()
38 }
39}
40
41enum ProviderSource {
42 Static(ArcProvider),
43 Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
44}
45
46impl Clone for ProviderSource {
47 fn clone(&self) -> Self {
48 match self {
49 Self::Static(p) => Self::Static(p.clone()),
50 Self::Dynamic(_) => Self::Dynamic(None),
51 }
52 }
53}
54
55#[derive(Debug, Snafu)]
57#[snafu(context(suffix(false)))]
58pub enum ConfigurationError {
59 #[snafu(display("Environment variable prefix must not be empty."))]
61 EmptyPrefix,
62
63 #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
65 MissingField {
66 help_text: String,
71
72 field: Cow<'static, str>,
74 },
75
76 #[snafu(display(
78 "Expected value for field '{}' to be '{}', got '{}' instead.",
79 field,
80 expected_ty,
81 actual_ty
82 ))]
83 InvalidFieldType {
84 field: String,
88
89 expected_ty: String,
91
92 actual_ty: String,
94 },
95
96 #[snafu(transparent)]
98 Generic {
99 source: GenericError,
101 },
102
103 #[snafu(display("Failed to resolve secrets."))]
105 Secrets {
106 source: secrets::Error,
108 },
109}
110
111impl From<figment::Error> for ConfigurationError {
112 fn from(e: figment::Error) -> Self {
113 match e.kind {
114 Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
115 field: e.path.join("."),
116 expected_ty,
117 actual_ty: actual_ty.to_string(),
118 },
119 _ => Self::Generic { source: e.into() },
120 }
121 }
122}
123
124#[derive(Clone, Debug, Eq, Hash, PartialEq)]
125enum LookupSource {
126 Environment { prefix: String },
128}
129
130impl LookupSource {
131 fn transform_key(&self, key: &str) -> String {
132 match self {
133 LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
136 }
137 }
138}
139
140#[derive(Clone, Default)]
156pub struct ConfigurationLoader {
157 key_aliases: &'static [(&'static str, &'static str)],
158 lookup_sources: HashSet<LookupSource>,
159 provider_sources: Vec<ProviderSource>,
160}
161
162impl ConfigurationLoader {
163 pub fn with_key_aliases(mut self, aliases: &'static [(&'static str, &'static str)]) -> Self {
172 self.key_aliases = aliases;
173 self
174 }
175
176 pub fn add_providers<P, I>(mut self, providers: I) -> Self
186 where
187 P: Provider + Send + Sync + 'static,
188 I: IntoIterator<Item = P>,
189 {
190 for p in providers {
191 self.provider_sources
192 .push(ProviderSource::Static(ArcProvider(Arc::new(p))));
193 }
194 self
195 }
196
197 pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
203 where
204 P: AsRef<std::path::Path>,
205 {
206 let resolved_provider = ResolvedProvider::from_yaml(&path, self.key_aliases)?;
207 self.provider_sources
208 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
209 Ok(self)
210 }
211
212 pub fn try_from_yaml<P>(mut self, path: P) -> Self
216 where
217 P: AsRef<std::path::Path>,
218 {
219 match ResolvedProvider::from_yaml(&path, self.key_aliases) {
220 Ok(resolved_provider) => {
221 self.provider_sources
222 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
223 }
224 Err(e) => {
225 println!(
226 "Unable to read YAML configuration file '{}': {}. Ignoring.",
227 path.as_ref().to_string_lossy(),
228 e
229 );
230 }
231 }
232 self
233 }
234
235 pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
241 where
242 P: AsRef<std::path::Path>,
243 {
244 let resolved_provider = ResolvedProvider::from_json(&path, self.key_aliases)?;
245 self.provider_sources
246 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
247 Ok(self)
248 }
249
250 pub fn try_from_json<P>(mut self, path: P) -> Self
254 where
255 P: AsRef<std::path::Path>,
256 {
257 match ResolvedProvider::from_json(&path, self.key_aliases) {
258 Ok(resolved_provider) => {
259 self.provider_sources
260 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
261 }
262 Err(e) => {
263 println!(
264 "Unable to read JSON configuration file '{}': {}. Ignoring.",
265 path.as_ref().to_string_lossy(),
266 e
267 );
268 }
269 }
270 self
271 }
272
273 pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
286 if prefix.is_empty() {
287 return Err(ConfigurationError::EmptyPrefix);
288 }
289
290 let prefix = if prefix.ends_with('_') {
291 prefix.to_string()
292 } else {
293 format!("{}_", prefix)
294 };
295
296 let env = Env::prefixed(&prefix).split("__");
298 let values = env.data().unwrap();
299 if let Some(default_dict) = values.get(&figment::Profile::Default) {
300 self.provider_sources
301 .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
302 default_dict.clone(),
303 )))));
304 self.lookup_sources.insert(LookupSource::Environment { prefix });
305 }
306 Ok(self)
307 }
308
309 pub async fn with_default_secrets_resolution(mut self) -> Result<Self, ConfigurationError> {
374 let configuration = build_figment_from_sources(&self.provider_sources);
375
376 if !has_valid_secret_backend_command(&configuration) {
378 debug!("No secrets backend configured; skipping secrets resolution.");
379 return Ok(self);
380 }
381
382 let resolver_config = configuration.extract::<secrets::resolver::ExternalProcessResolverConfiguration>()?;
383 let resolver = secrets::resolver::ExternalProcessResolver::from_configuration(resolver_config)
384 .await
385 .context(Secrets)?;
386
387 let provider = secrets::Provider::new(resolver, &configuration)
388 .await
389 .context(Secrets)?;
390
391 self.provider_sources
392 .push(ProviderSource::Static(ArcProvider(Arc::new(provider))));
393 Ok(self)
394 }
395
396 pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
400 self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
401 self
402 }
403
404 pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
410 where
411 T: Deserialize<'a>,
412 {
413 let figment = build_figment_from_sources(&self.provider_sources);
414 figment.extract().map_err(Into::into)
415 }
416
417 pub fn bootstrap_generic(&self) -> GenericConfiguration {
422 let figment = build_figment_from_sources(&self.provider_sources);
423
424 GenericConfiguration {
425 inner: Arc::new(Inner {
426 figment: RwLock::new(figment),
427 lookup_sources: self.lookup_sources.clone(),
428 event_sender: None,
429 ready_signal: Mutex::new(None),
430 }),
431 }
432 }
433
434 pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
436 let has_dynamic_provider = self
437 .provider_sources
438 .iter()
439 .any(|s| matches!(s, ProviderSource::Dynamic(_)));
440
441 if has_dynamic_provider {
442 let mut receiver_opt = None;
443 for source in self.provider_sources.iter_mut() {
444 if let ProviderSource::Dynamic(ref mut receiver) = source {
445 receiver_opt = receiver.take();
446 break;
447 }
448 }
449 let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
450
451 let figment = build_figment_from_sources(&self.provider_sources);
453
454 let (event_sender, _) = broadcast::channel(100);
455 let (ready_sender, ready_receiver) = oneshot::channel();
456
457 let generic_config = GenericConfiguration {
458 inner: Arc::new(Inner {
459 figment: RwLock::new(figment),
460 lookup_sources: self.lookup_sources,
461 event_sender: Some(event_sender.clone()),
462 ready_signal: Mutex::new(Some(ready_receiver)),
463 }),
464 };
465
466 tokio::spawn(run_dynamic_config_updater(
468 generic_config.inner.clone(),
469 receiver,
470 self.provider_sources,
471 event_sender,
472 ready_sender,
473 ));
474
475 Ok(generic_config)
476 } else {
477 let figment = build_figment_from_sources(&self.provider_sources);
479
480 Ok(GenericConfiguration {
481 inner: Arc::new(Inner {
482 figment: RwLock::new(figment),
483 lookup_sources: self.lookup_sources,
484 event_sender: None,
485 ready_signal: Mutex::new(None),
486 }),
487 })
488 }
489 }
490
491 pub async fn for_tests(
502 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
503 enable_dynamic_configuration: bool,
504 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
505 Self::for_tests_with_provider_factory(file_values, env_vars, enable_dynamic_configuration, &[], || {
506 Serialized::defaults(serde_json::json!({}))
507 })
508 .await
509 }
510
511 pub async fn for_tests_with_provider_factory<P, F>(
520 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
521 enable_dynamic_configuration: bool, key_aliases: &'static [(&'static str, &'static str)], provider_factory: F,
522 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>)
523 where
524 P: Provider + Send + Sync + 'static,
525 F: FnOnce() -> P,
526 {
527 let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
528 let path = &json_file.path();
529 let json_to_write = file_values.unwrap_or(serde_json::json!({}));
530 serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
531
532 let mut loader = ConfigurationLoader::default()
533 .with_key_aliases(key_aliases)
534 .try_from_json(path);
535 let mut maybe_sender = None;
536 if enable_dynamic_configuration {
537 let (sender, receiver) = tokio::sync::mpsc::channel(1);
538 loader = loader.with_dynamic_configuration(receiver);
539 maybe_sender = Some(sender);
540 }
541
542 static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
543
544 let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
545
546 if let Some(pairs) = env_vars.as_ref() {
547 for (k, v) in pairs.iter() {
548 std::env::set_var(k, v);
552 std::env::set_var(format!("TEST_{}", k), v);
553 }
554 }
555
556 let loader = loader.add_providers([provider_factory()]);
558
559 let loader = loader
561 .from_environment("TEST")
562 .expect("should not fail to add environment provider");
563
564 if let Some(pairs) = env_vars.as_ref() {
566 for (k, _) in pairs.iter() {
567 std::env::remove_var(k);
568 std::env::remove_var(format!("TEST_{}", k));
569 }
570 }
571
572 drop(guard);
573
574 let cfg = loader
575 .into_generic()
576 .await
577 .expect("should not fail to build generic configuration");
578
579 (cfg, maybe_sender)
580 }
581}
582
583fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
584 sources.iter().fold(Figment::new(), |figment, source| match source {
585 ProviderSource::Static(p) => figment.admerge(p.clone()),
586 ProviderSource::Dynamic(_) => figment,
588 })
589}
590
591pub fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
595 if !root.is_object() {
596 *root = serde_json::Value::Object(serde_json::Map::new());
597 }
598
599 let mut current = root;
600 let mut segments = key.split('.').peekable();
602
603 while let Some(seg) = segments.next() {
604 let is_leaf = segments.peek().is_none();
605
606 if !current.is_object() {
608 *current = serde_json::Value::Object(serde_json::Map::new());
609 }
610 let node = current.as_object_mut().expect("current node should be an object");
611
612 if is_leaf {
613 node.insert(seg.to_string(), value);
614 break;
615 } else {
616 let should_create_node = match node.get(seg) {
618 Some(v) => !v.is_object(),
619 None => true,
620 };
621 if should_create_node {
623 node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
624 }
625
626 current = node.get_mut(seg).expect("should not fail to get nested object");
628 }
629 }
630}
631
632async fn run_dynamic_config_updater(
633 inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
634 sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
635) {
636 let initial_update = match receiver.recv().await {
638 Some(update) => update,
639 None => {
640 debug!("Dynamic configuration channel closed before initial snapshot.");
642 return;
643 }
644 };
645
646 let mut dynamic_state = match initial_update {
647 ConfigUpdate::Snapshot(state) => state,
648 ConfigUpdate::Partial { .. } => {
649 error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
651 serde_json::Value::Null
652 }
653 };
654
655 let new_figment = provider_sources
657 .iter()
658 .fold(Figment::new(), |figment, source| match source {
659 ProviderSource::Static(p) => figment.admerge(p.clone()),
660 ProviderSource::Dynamic(_) => {
661 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
662 }
663 });
664
665 {
667 let mut figment_guard = inner.figment.write().unwrap();
668 *figment_guard = new_figment.clone();
669 }
670
671 if ready_sender.send(()).is_err() {
673 debug!("Configuration readiness receiver dropped. Updater task shutting down.");
674 return;
675 }
676
677 let mut current_config: figment::value::Value = new_figment.extract().unwrap();
679
680 loop {
682 let update = match receiver.recv().await {
683 Some(update) => update,
684 None => {
685 debug!("Dynamic configuration update channel closed. Updater task shutting down.");
687 return;
688 }
689 };
690
691 match update {
693 ConfigUpdate::Snapshot(new_state) => {
694 debug!("Received configuration snapshot update.");
695 dynamic_state = new_state;
696 }
697 ConfigUpdate::Partial { key, value } => {
698 debug!(%key, "Received partial configuration update.");
699 if dynamic_state.is_null() {
700 dynamic_state = serde_json::Value::Object(serde_json::Map::new());
701 }
702 if dynamic_state.is_object() {
703 upsert(&mut dynamic_state, &key, value);
704 } else {
705 error!(
706 "Received partial update but current dynamic state is not an object. This should not happen."
707 );
708 }
709 }
710 }
711
712 let new_figment = provider_sources
714 .iter()
715 .fold(Figment::new(), |figment, source| match source {
716 ProviderSource::Static(p) => figment.admerge(p.clone()),
717 ProviderSource::Dynamic(_) => {
718 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
719 }
720 });
721
722 let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
723
724 if current_config != new_config {
725 for change in dynamic::diff_config(¤t_config, &new_config) {
726 let _ = sender.send(change);
730 }
731
732 let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
733 error!("Failed to acquire write lock for dynamic configuration: {}", e);
734 e.into_inner()
735 });
736 *figment_guard = new_figment;
737
738 current_config = new_config;
740 }
741 }
742}
743
744#[derive(Debug)]
745struct Inner {
746 figment: RwLock<Figment>,
747 lookup_sources: HashSet<LookupSource>,
748 event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
749 ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
750}
751
752#[derive(Clone, Debug)]
774pub struct GenericConfiguration {
775 inner: Arc<Inner>,
776}
777
778impl GenericConfiguration {
779 pub async fn ready(&self) {
786 let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
789 if let Some(ready_rx) = maybe_ready_rx.take() {
790 if ready_rx.await.is_err() {
792 error!("Failed to receive configuration readiness signal; updater task may have panicked.");
793 }
794 }
795 }
796
797 fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
798 where
799 T: Deserialize<'a>,
800 {
801 let figment_guard = self.inner.figment.read().unwrap();
802 match figment_guard.extract_inner(key) {
803 Ok(value) => Ok(value),
804 Err(e) => {
805 if matches!(e.kind, figment::error::Kind::MissingField(_)) {
806 let fallback_key = key.replace('.', "_");
811 figment_guard
812 .extract_inner(&fallback_key)
813 .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
814 } else {
815 Err(e.into())
816 }
817 }
818 }
819 }
820
821 pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
830 where
831 T: Deserialize<'a>,
832 {
833 self.get(key)
834 }
835
836 pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
843 where
844 T: Default + Deserialize<'a>,
845 {
846 self.get(key).unwrap_or_default()
847 }
848
849 pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
860 where
861 T: Deserialize<'a>,
862 {
863 match self.get(key) {
864 Ok(value) => Ok(Some(value)),
865 Err(ConfigurationError::MissingField { .. }) => Ok(None),
866 Err(e) => Err(e),
867 }
868 }
869
870 pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
876 where
877 T: Deserialize<'a>,
878 {
879 self.inner
880 .figment
881 .read()
882 .unwrap()
883 .extract()
884 .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
885 }
886
887 pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
889 self.inner.event_sender.as_ref().map(|s| s.subscribe())
890 }
891
892 pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
897 FieldUpdateWatcher {
898 key: key.to_string(),
899 rx: self.subscribe_for_updates(),
900 }
901 }
902}
903
904fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
905 match e.kind {
906 Kind::MissingField(field) => {
907 let mut valid_keys = lookup_sources
908 .iter()
909 .map(|source| source.transform_key(&field))
910 .collect::<Vec<_>>();
911
912 valid_keys.insert(0, field.to_string());
914
915 let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
916
917 ConfigurationError::MissingField { help_text, field }
918 }
919 Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
920 field: e.path.join("."),
921 expected_ty,
922 actual_ty: actual_ty.to_string(),
923 },
924 _ => ConfigurationError::Generic { source: e.into() },
925 }
926}
927
928fn has_valid_secret_backend_command(configuration: &Figment) -> bool {
929 configuration
930 .find_value("secret_backend_command")
931 .ok()
932 .is_some_and(|v| v.as_str().filter(|s| !s.is_empty()).is_some())
933}
934
935#[cfg(test)]
936mod tests {
937 use super::*;
938
939 macro_rules! json_to_figment {
940 ($json:tt) => {
941 Figment::from(Serialized::defaults(serde_json::json!($json)))
942 };
943 }
944
945 #[test]
946 fn test_has_valid_secret_backend_command() {
947 let figment = Figment::new();
950 assert!(!has_valid_secret_backend_command(&figment));
951
952 let figment = json_to_figment!({
953 "secret_backend_command": ""
954 });
955 assert!(!has_valid_secret_backend_command(&figment));
956
957 let figment = json_to_figment!({
958 "secret_backend_command": false
959 });
960 assert!(!has_valid_secret_backend_command(&figment));
961
962 let figment = json_to_figment!({
965 "secret_backend_command": "/usr/bin/foo"
966 });
967 assert!(has_valid_secret_backend_command(&figment));
968
969 let figment = json_to_figment!({
970 "secret_backend_command": "or anything else"
971 });
972 assert!(has_valid_secret_backend_command(&figment));
973 }
974
975 #[tokio::test]
976 async fn test_static_configuration() {
977 let (cfg, _) = ConfigurationLoader::for_tests(
978 Some(serde_json::json!({
979 "foo": "bar",
980 "baz": 5,
981 "foobar": { "a": false, "b": "c" }
982 })),
983 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
984 false,
985 )
986 .await;
987 cfg.ready().await;
988
989 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
990 assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
991 assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
992 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
993 assert!(matches!(
994 cfg.get::<String>("nonexistentKey"),
995 Err(ConfigurationError::MissingField { .. })
996 ));
997 }
998
999 #[tokio::test]
1000 async fn test_dynamic_configuration() {
1001 let (cfg, sender) = ConfigurationLoader::for_tests(
1002 Some(serde_json::json!({
1003 "foo": "bar",
1004 "baz": 5,
1005 "foobar": { "a": false, "b": "c" }
1006 })),
1007 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
1008 true,
1009 )
1010 .await;
1011 let sender = sender.expect("sender should exist");
1012 sender
1013 .send(ConfigUpdate::Snapshot(serde_json::json!({
1014 "new": "from_snapshot",
1015 })))
1016 .await
1017 .unwrap();
1018
1019 cfg.ready().await;
1020
1021 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
1023
1024 assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
1026
1027 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1028
1029 sender
1030 .send(ConfigUpdate::Partial {
1031 key: "new_key".to_string(),
1032 value: "from dynamic update".to_string().into(),
1033 })
1034 .await
1035 .unwrap();
1036
1037 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1038 loop {
1039 match rx.recv().await {
1040 Ok(ev) if ev.key == "new_key" => break ev,
1041 Err(e) => panic!("updates channel closed: {e}"),
1042 Ok(_) => continue,
1043 }
1044 }
1045 })
1046 .await
1047 .expect("timed out waiting for new_key update");
1048
1049 assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
1050
1051 sender
1053 .send(ConfigUpdate::Partial {
1054 key: "foobar.a".to_string(),
1055 value: serde_json::json!(true),
1056 })
1057 .await
1058 .unwrap();
1059
1060 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1061 loop {
1062 match rx.recv().await {
1063 Ok(ev) if ev.key == "foobar.a" => break ev,
1064 Err(e) => panic!("updates channel closed: {e}"),
1065 Ok(_) => continue,
1066 }
1067 }
1068 })
1069 .await
1070 .expect("timed out waiting for foobar.a update");
1071
1072 assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
1073 assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
1074 }
1075
1076 #[tokio::test]
1077 async fn test_environment_precedence_over_dynamic() {
1078 let (cfg, sender) = ConfigurationLoader::for_tests(
1079 Some(serde_json::json!({
1080 "foo": "bar",
1081 "baz": 5,
1082 "foobar": { "a": false, "b": "c" }
1083 })),
1084 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
1085 true,
1086 )
1087 .await;
1088 let sender = sender.expect("sender should exist");
1089
1090 sender
1091 .send(ConfigUpdate::Snapshot(serde_json::json!({
1092 "env_var": "from_snapshot_env_var"
1093 })))
1094 .await
1095 .unwrap();
1096
1097 cfg.ready().await;
1098
1099 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1101
1102 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1103
1104 sender
1106 .send(ConfigUpdate::Partial {
1107 key: "env_var".to_string(),
1108 value: serde_json::json!("from_partial"),
1109 })
1110 .await
1111 .unwrap();
1112
1113 sender
1115 .send(ConfigUpdate::Partial {
1116 key: "foobar.a".to_string(),
1117 value: serde_json::json!(false),
1118 })
1119 .await
1120 .unwrap();
1121
1122 sender
1124 .send(ConfigUpdate::Partial {
1125 key: "dummy".to_string(),
1126 value: serde_json::json!(1),
1127 })
1128 .await
1129 .unwrap();
1130
1131 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1132 loop {
1133 match rx.recv().await {
1134 Ok(ev) if ev.key == "dummy" => break,
1135 Err(e) => panic!("updates channel closed: {e}"),
1136 Ok(_) => continue,
1137 }
1138 }
1139 })
1140 .await
1141 .expect("timed out waiting for sync marker");
1142
1143 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1144 }
1145
1146 #[tokio::test]
1147 async fn test_dynamic_configuration_add_new_nested_key() {
1148 let (cfg, sender) = ConfigurationLoader::for_tests(
1149 Some(serde_json::json!({
1150 "foo": "bar",
1151 "baz": 5,
1152 "foobar": { "a": false, "b": "c" }
1153 })),
1154 None,
1155 true,
1156 )
1157 .await;
1158 let sender = sender.expect("sender should exist");
1159
1160 sender
1161 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1162 .await
1163 .unwrap();
1164 cfg.ready().await;
1165
1166 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1167
1168 sender
1169 .send(ConfigUpdate::Partial {
1170 key: "new_parent.new_child".to_string(),
1171 value: serde_json::json!(42),
1172 })
1173 .await
1174 .unwrap();
1175
1176 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1178 loop {
1179 match rx.recv().await {
1180 Ok(ev) if ev.key == "new_parent" => break ev,
1181 Err(e) => panic!("updates channel closed: {e}"),
1182 Ok(_) => continue,
1183 }
1184 }
1185 })
1186 .await
1187 .expect("timed out waiting for new_parent.new_child update");
1188
1189 assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1190 }
1191
1192 #[tokio::test]
1193 async fn test_underscore_fallback_on_get() {
1194 let (cfg, _) = ConfigurationLoader::for_tests(
1195 Some(serde_json::json!({})),
1196 Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1197 false,
1198 )
1199 .await;
1200 cfg.ready().await;
1201
1202 assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1203 }
1204
1205 #[tokio::test]
1206 async fn test_static_configuration_ready_and_subscribe() {
1207 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1208 assert!(maybe_sender.is_none());
1209
1210 tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1211 .await
1212 .expect("ready() should not block when dynamic is disabled");
1213
1214 assert!(cfg.subscribe_for_updates().is_none());
1215 }
1216
1217 #[tokio::test]
1218 async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1219 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1221 assert!(maybe_sender.is_some());
1222
1223 let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1225 assert!(res.is_err(), "ready() should time out without an initial snapshot");
1226 }
1227}