1#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10 error::Kind,
11 providers::{Env, Serialized},
12 Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::Snafu;
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod duration_string;
21pub mod dynamic;
22mod provider;
23pub mod space_separated;
24
25pub use self::duration_string::{DurationString, ParseDurationError};
26pub use self::dynamic::FieldUpdateWatcher;
27use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
28use self::provider::ResolvedProvider;
29pub use self::space_separated::{deserialize_opt_space_separated_or_seq, deserialize_space_separated_or_seq};
30
31#[derive(Clone)]
32struct ArcProvider(Arc<dyn Provider + Send + Sync>);
33
34impl Provider for ArcProvider {
35 fn metadata(&self) -> figment::Metadata {
36 self.0.metadata()
37 }
38
39 fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
40 self.0.data()
41 }
42}
43
44enum ProviderSource {
45 Static(ArcProvider),
46 Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
47}
48
49impl Clone for ProviderSource {
50 fn clone(&self) -> Self {
51 match self {
52 Self::Static(p) => Self::Static(p.clone()),
53 Self::Dynamic(_) => Self::Dynamic(None),
54 }
55 }
56}
57
58#[derive(Debug, Snafu)]
60#[snafu(context(suffix(false)))]
61pub enum ConfigurationError {
62 #[snafu(display("Environment variable prefix must not be empty."))]
64 EmptyPrefix,
65
66 #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
68 MissingField {
69 help_text: String,
74
75 field: Cow<'static, str>,
77 },
78
79 #[snafu(display(
81 "Expected value for field '{}' to be '{}', got '{}' instead.",
82 field,
83 expected_ty,
84 actual_ty
85 ))]
86 InvalidFieldType {
87 field: String,
91
92 expected_ty: String,
94
95 actual_ty: String,
97 },
98
99 #[snafu(transparent)]
101 Generic {
102 source: GenericError,
104 },
105}
106
107impl From<figment::Error> for ConfigurationError {
108 fn from(e: figment::Error) -> Self {
109 match e.kind {
110 Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
111 field: e.path.join("."),
112 expected_ty,
113 actual_ty: actual_ty.to_string(),
114 },
115 _ => Self::Generic { source: e.into() },
116 }
117 }
118}
119
120#[derive(Clone, Debug, Eq, Hash, PartialEq)]
121enum LookupSource {
122 Environment { prefix: String },
124}
125
126impl LookupSource {
127 fn transform_key(&self, key: &str) -> String {
128 match self {
129 LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
132 }
133 }
134}
135
136#[derive(Clone, Default)]
152pub struct ConfigurationLoader {
153 key_aliases: &'static [(&'static str, &'static str)],
154 lookup_sources: HashSet<LookupSource>,
155 provider_sources: Vec<ProviderSource>,
156}
157
158impl ConfigurationLoader {
159 pub fn with_key_aliases(mut self, aliases: &'static [(&'static str, &'static str)]) -> Self {
168 self.key_aliases = aliases;
169 self
170 }
171
172 pub fn add_providers<P, I>(mut self, providers: I) -> Self
182 where
183 P: Provider + Send + Sync + 'static,
184 I: IntoIterator<Item = P>,
185 {
186 for p in providers {
187 self.provider_sources
188 .push(ProviderSource::Static(ArcProvider(Arc::new(p))));
189 }
190 self
191 }
192
193 pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
199 where
200 P: AsRef<std::path::Path>,
201 {
202 let resolved_provider = ResolvedProvider::from_yaml(&path, self.key_aliases)?;
203 self.provider_sources
204 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
205 Ok(self)
206 }
207
208 pub fn try_from_yaml<P>(mut self, path: P) -> Self
212 where
213 P: AsRef<std::path::Path>,
214 {
215 match ResolvedProvider::from_yaml(&path, self.key_aliases) {
216 Ok(resolved_provider) => {
217 self.provider_sources
218 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
219 }
220 Err(e) => {
221 println!(
222 "Unable to read YAML configuration file '{}': {}. Ignoring.",
223 path.as_ref().to_string_lossy(),
224 e
225 );
226 }
227 }
228 self
229 }
230
231 pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
237 where
238 P: AsRef<std::path::Path>,
239 {
240 let resolved_provider = ResolvedProvider::from_json(&path, self.key_aliases)?;
241 self.provider_sources
242 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
243 Ok(self)
244 }
245
246 pub fn try_from_json<P>(mut self, path: P) -> Self
250 where
251 P: AsRef<std::path::Path>,
252 {
253 match ResolvedProvider::from_json(&path, self.key_aliases) {
254 Ok(resolved_provider) => {
255 self.provider_sources
256 .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
257 }
258 Err(e) => {
259 println!(
260 "Unable to read JSON configuration file '{}': {}. Ignoring.",
261 path.as_ref().to_string_lossy(),
262 e
263 );
264 }
265 }
266 self
267 }
268
269 pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
282 if prefix.is_empty() {
283 return Err(ConfigurationError::EmptyPrefix);
284 }
285
286 let prefix = if prefix.ends_with('_') {
287 prefix.to_string()
288 } else {
289 format!("{}_", prefix)
290 };
291
292 let env = Env::prefixed(&prefix).split("__");
294 let values = env.data().unwrap();
295 if let Some(default_dict) = values.get(&figment::Profile::Default) {
296 self.provider_sources
297 .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
298 default_dict.clone(),
299 )))));
300 self.lookup_sources.insert(LookupSource::Environment { prefix });
301 }
302 Ok(self)
303 }
304
305 pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
309 self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
310 self
311 }
312
313 pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
319 where
320 T: Deserialize<'a>,
321 {
322 let figment = build_figment_from_sources(&self.provider_sources);
323 figment.extract().map_err(Into::into)
324 }
325
326 pub fn bootstrap_generic(&self) -> GenericConfiguration {
331 let figment = build_figment_from_sources(&self.provider_sources);
332
333 GenericConfiguration {
334 inner: Arc::new(Inner {
335 figment: RwLock::new(figment),
336 lookup_sources: self.lookup_sources.clone(),
337 event_sender: None,
338 ready_signal: Mutex::new(None),
339 }),
340 }
341 }
342
343 pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
345 let has_dynamic_provider = self
346 .provider_sources
347 .iter()
348 .any(|s| matches!(s, ProviderSource::Dynamic(_)));
349
350 if has_dynamic_provider {
351 let mut receiver_opt = None;
352 for source in self.provider_sources.iter_mut() {
353 if let ProviderSource::Dynamic(ref mut receiver) = source {
354 receiver_opt = receiver.take();
355 break;
356 }
357 }
358 let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
359
360 let figment = build_figment_from_sources(&self.provider_sources);
362
363 let (event_sender, _) = broadcast::channel(100);
364 let (ready_sender, ready_receiver) = oneshot::channel();
365
366 let generic_config = GenericConfiguration {
367 inner: Arc::new(Inner {
368 figment: RwLock::new(figment),
369 lookup_sources: self.lookup_sources,
370 event_sender: Some(event_sender.clone()),
371 ready_signal: Mutex::new(Some(ready_receiver)),
372 }),
373 };
374
375 tokio::spawn(run_dynamic_config_updater(
377 generic_config.inner.clone(),
378 receiver,
379 self.provider_sources,
380 event_sender,
381 ready_sender,
382 ));
383
384 Ok(generic_config)
385 } else {
386 let figment = build_figment_from_sources(&self.provider_sources);
388
389 Ok(GenericConfiguration {
390 inner: Arc::new(Inner {
391 figment: RwLock::new(figment),
392 lookup_sources: self.lookup_sources,
393 event_sender: None,
394 ready_signal: Mutex::new(None),
395 }),
396 })
397 }
398 }
399
400 pub async fn for_tests(
411 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
412 enable_dynamic_configuration: bool,
413 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
414 Self::for_tests_with_provider_factory(file_values, env_vars, enable_dynamic_configuration, &[], || {
415 Serialized::defaults(serde_json::json!({}))
416 })
417 .await
418 }
419
420 pub async fn for_tests_with_provider_factory<P, F>(
429 file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
430 enable_dynamic_configuration: bool, key_aliases: &'static [(&'static str, &'static str)], provider_factory: F,
431 ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>)
432 where
433 P: Provider + Send + Sync + 'static,
434 F: FnOnce() -> P,
435 {
436 let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
437 let path = &json_file.path();
438 let json_to_write = file_values.unwrap_or(serde_json::json!({}));
439 serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
440
441 let mut loader = ConfigurationLoader::default()
442 .with_key_aliases(key_aliases)
443 .try_from_json(path);
444 let mut maybe_sender = None;
445 if enable_dynamic_configuration {
446 let (sender, receiver) = tokio::sync::mpsc::channel(1);
447 loader = loader.with_dynamic_configuration(receiver);
448 maybe_sender = Some(sender);
449 }
450
451 static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
452
453 let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
454
455 if let Some(pairs) = env_vars.as_ref() {
456 for (k, v) in pairs.iter() {
457 std::env::set_var(k, v);
461 std::env::set_var(format!("TEST_{}", k), v);
462 }
463 }
464
465 let loader = loader.add_providers([provider_factory()]);
467
468 let loader = loader
470 .from_environment("TEST")
471 .expect("should not fail to add environment provider");
472
473 if let Some(pairs) = env_vars.as_ref() {
475 for (k, _) in pairs.iter() {
476 std::env::remove_var(k);
477 std::env::remove_var(format!("TEST_{}", k));
478 }
479 }
480
481 drop(guard);
482
483 let cfg = loader
484 .into_generic()
485 .await
486 .expect("should not fail to build generic configuration");
487
488 (cfg, maybe_sender)
489 }
490}
491
492fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
493 sources.iter().fold(Figment::new(), |figment, source| match source {
494 ProviderSource::Static(p) => figment.admerge(p.clone()),
495 ProviderSource::Dynamic(_) => figment,
497 })
498}
499
500pub fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
504 if !root.is_object() {
505 *root = serde_json::Value::Object(serde_json::Map::new());
506 }
507
508 let mut current = root;
509 let mut segments = key.split('.').peekable();
511
512 while let Some(seg) = segments.next() {
513 let is_leaf = segments.peek().is_none();
514
515 if !current.is_object() {
517 *current = serde_json::Value::Object(serde_json::Map::new());
518 }
519 let node = current.as_object_mut().expect("current node should be an object");
520
521 if is_leaf {
522 node.insert(seg.to_string(), value);
523 break;
524 } else {
525 let should_create_node = match node.get(seg) {
527 Some(v) => !v.is_object(),
528 None => true,
529 };
530 if should_create_node {
532 node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
533 }
534
535 current = node.get_mut(seg).expect("should not fail to get nested object");
537 }
538 }
539}
540
541async fn run_dynamic_config_updater(
542 inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
543 sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
544) {
545 let initial_update = match receiver.recv().await {
547 Some(update) => update,
548 None => {
549 debug!("Dynamic configuration channel closed before initial snapshot.");
551 return;
552 }
553 };
554
555 let mut dynamic_state = match initial_update {
556 ConfigUpdate::Snapshot(state) => state,
557 ConfigUpdate::Partial { .. } => {
558 error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
560 serde_json::Value::Null
561 }
562 };
563
564 let new_figment = provider_sources
566 .iter()
567 .fold(Figment::new(), |figment, source| match source {
568 ProviderSource::Static(p) => figment.admerge(p.clone()),
569 ProviderSource::Dynamic(_) => {
570 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
571 }
572 });
573
574 {
576 let mut figment_guard = inner.figment.write().unwrap();
577 *figment_guard = new_figment.clone();
578 }
579
580 if ready_sender.send(()).is_err() {
582 debug!("Configuration readiness receiver dropped. Updater task shutting down.");
583 return;
584 }
585
586 let mut current_config: figment::value::Value = new_figment.extract().unwrap();
588
589 loop {
591 let update = match receiver.recv().await {
592 Some(update) => update,
593 None => {
594 debug!("Dynamic configuration update channel closed. Updater task shutting down.");
596 return;
597 }
598 };
599
600 match update {
602 ConfigUpdate::Snapshot(new_state) => {
603 debug!("Received configuration snapshot update.");
604 dynamic_state = new_state;
605 }
606 ConfigUpdate::Partial { key, value } => {
607 debug!(%key, "Received partial configuration update.");
608 if dynamic_state.is_null() {
609 dynamic_state = serde_json::Value::Object(serde_json::Map::new());
610 }
611 if dynamic_state.is_object() {
612 upsert(&mut dynamic_state, &key, value);
613 } else {
614 error!(
615 "Received partial update but current dynamic state is not an object. This should not happen."
616 );
617 }
618 }
619 }
620
621 let new_figment = provider_sources
623 .iter()
624 .fold(Figment::new(), |figment, source| match source {
625 ProviderSource::Static(p) => figment.admerge(p.clone()),
626 ProviderSource::Dynamic(_) => {
627 figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
628 }
629 });
630
631 let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
632
633 if current_config != new_config {
634 for change in dynamic::diff_config(¤t_config, &new_config) {
635 let _ = sender.send(change);
639 }
640
641 let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
642 error!("Failed to acquire write lock for dynamic configuration: {}", e);
643 e.into_inner()
644 });
645 *figment_guard = new_figment;
646
647 current_config = new_config;
649 }
650 }
651}
652
653#[derive(Debug)]
654struct Inner {
655 figment: RwLock<Figment>,
656 lookup_sources: HashSet<LookupSource>,
657 event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
658 ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
659}
660
661#[derive(Clone, Debug)]
683pub struct GenericConfiguration {
684 inner: Arc<Inner>,
685}
686
687impl GenericConfiguration {
688 pub async fn ready(&self) {
695 let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
698 if let Some(ready_rx) = maybe_ready_rx.take() {
699 if ready_rx.await.is_err() {
701 error!("Failed to receive configuration readiness signal; updater task may have panicked.");
702 }
703 }
704 }
705
706 fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
707 where
708 T: Deserialize<'a>,
709 {
710 let figment_guard = self.inner.figment.read().unwrap();
711 match figment_guard.extract_inner(key) {
712 Ok(value) => Ok(value),
713 Err(e) => {
714 if matches!(e.kind, figment::error::Kind::MissingField(_)) {
715 let fallback_key = key.replace('.', "_");
720 figment_guard
721 .extract_inner(&fallback_key)
722 .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
723 } else {
724 Err(e.into())
725 }
726 }
727 }
728 }
729
730 pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
739 where
740 T: Deserialize<'a>,
741 {
742 self.get(key)
743 }
744
745 pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
752 where
753 T: Default + Deserialize<'a>,
754 {
755 self.get(key).unwrap_or_default()
756 }
757
758 pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
769 where
770 T: Deserialize<'a>,
771 {
772 match self.get(key) {
773 Ok(value) => Ok(Some(value)),
774 Err(ConfigurationError::MissingField { .. }) => Ok(None),
775 Err(e) => Err(e),
776 }
777 }
778
779 pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
785 where
786 T: Deserialize<'a>,
787 {
788 self.inner
789 .figment
790 .read()
791 .unwrap()
792 .extract()
793 .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
794 }
795
796 pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
798 self.inner.event_sender.as_ref().map(|s| s.subscribe())
799 }
800
801 pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
806 FieldUpdateWatcher {
807 key: key.to_string(),
808 rx: self.subscribe_for_updates(),
809 }
810 }
811}
812
813fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
814 match e.kind {
815 Kind::MissingField(field) => {
816 let mut valid_keys = lookup_sources
817 .iter()
818 .map(|source| source.transform_key(&field))
819 .collect::<Vec<_>>();
820
821 valid_keys.insert(0, field.to_string());
823
824 let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
825
826 ConfigurationError::MissingField { help_text, field }
827 }
828 Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
829 field: e.path.join("."),
830 expected_ty,
831 actual_ty: actual_ty.to_string(),
832 },
833 _ => ConfigurationError::Generic { source: e.into() },
834 }
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840
841 #[tokio::test]
842 async fn test_static_configuration() {
843 let (cfg, _) = ConfigurationLoader::for_tests(
844 Some(serde_json::json!({
845 "foo": "bar",
846 "baz": 5,
847 "foobar": { "a": false, "b": "c" }
848 })),
849 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
850 false,
851 )
852 .await;
853 cfg.ready().await;
854
855 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
856 assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
857 assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
858 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
859 assert!(matches!(
860 cfg.get::<String>("nonexistentKey"),
861 Err(ConfigurationError::MissingField { .. })
862 ));
863 }
864
865 #[tokio::test]
866 async fn test_dynamic_configuration() {
867 let (cfg, sender) = ConfigurationLoader::for_tests(
868 Some(serde_json::json!({
869 "foo": "bar",
870 "baz": 5,
871 "foobar": { "a": false, "b": "c" }
872 })),
873 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
874 true,
875 )
876 .await;
877 let sender = sender.expect("sender should exist");
878 sender
879 .send(ConfigUpdate::Snapshot(serde_json::json!({
880 "new": "from_snapshot",
881 })))
882 .await
883 .unwrap();
884
885 cfg.ready().await;
886
887 assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
889
890 assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
892
893 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
894
895 sender
896 .send(ConfigUpdate::Partial {
897 key: "new_key".to_string(),
898 value: "from dynamic update".to_string().into(),
899 })
900 .await
901 .unwrap();
902
903 tokio::time::timeout(std::time::Duration::from_secs(2), async {
904 loop {
905 match rx.recv().await {
906 Ok(ev) if ev.key == "new_key" => break ev,
907 Err(e) => panic!("updates channel closed: {e}"),
908 Ok(_) => continue,
909 }
910 }
911 })
912 .await
913 .expect("timed out waiting for new_key update");
914
915 assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
916
917 sender
919 .send(ConfigUpdate::Partial {
920 key: "foobar.a".to_string(),
921 value: serde_json::json!(true),
922 })
923 .await
924 .unwrap();
925
926 tokio::time::timeout(std::time::Duration::from_secs(2), async {
927 loop {
928 match rx.recv().await {
929 Ok(ev) if ev.key == "foobar.a" => break ev,
930 Err(e) => panic!("updates channel closed: {e}"),
931 Ok(_) => continue,
932 }
933 }
934 })
935 .await
936 .expect("timed out waiting for foobar.a update");
937
938 assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
939 assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
940 }
941
942 #[tokio::test]
943 async fn test_environment_precedence_over_dynamic() {
944 let (cfg, sender) = ConfigurationLoader::for_tests(
945 Some(serde_json::json!({
946 "foo": "bar",
947 "baz": 5,
948 "foobar": { "a": false, "b": "c" }
949 })),
950 Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
951 true,
952 )
953 .await;
954 let sender = sender.expect("sender should exist");
955
956 sender
957 .send(ConfigUpdate::Snapshot(serde_json::json!({
958 "env_var": "from_snapshot_env_var"
959 })))
960 .await
961 .unwrap();
962
963 cfg.ready().await;
964
965 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
967
968 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
969
970 sender
972 .send(ConfigUpdate::Partial {
973 key: "env_var".to_string(),
974 value: serde_json::json!("from_partial"),
975 })
976 .await
977 .unwrap();
978
979 sender
981 .send(ConfigUpdate::Partial {
982 key: "foobar.a".to_string(),
983 value: serde_json::json!(false),
984 })
985 .await
986 .unwrap();
987
988 sender
990 .send(ConfigUpdate::Partial {
991 key: "dummy".to_string(),
992 value: serde_json::json!(1),
993 })
994 .await
995 .unwrap();
996
997 tokio::time::timeout(std::time::Duration::from_secs(2), async {
998 loop {
999 match rx.recv().await {
1000 Ok(ev) if ev.key == "dummy" => break,
1001 Err(e) => panic!("updates channel closed: {e}"),
1002 Ok(_) => continue,
1003 }
1004 }
1005 })
1006 .await
1007 .expect("timed out waiting for sync marker");
1008
1009 assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1010 }
1011
1012 #[tokio::test]
1013 async fn test_dynamic_configuration_add_new_nested_key() {
1014 let (cfg, sender) = ConfigurationLoader::for_tests(
1015 Some(serde_json::json!({
1016 "foo": "bar",
1017 "baz": 5,
1018 "foobar": { "a": false, "b": "c" }
1019 })),
1020 None,
1021 true,
1022 )
1023 .await;
1024 let sender = sender.expect("sender should exist");
1025
1026 sender
1027 .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1028 .await
1029 .unwrap();
1030 cfg.ready().await;
1031
1032 let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1033
1034 sender
1035 .send(ConfigUpdate::Partial {
1036 key: "new_parent.new_child".to_string(),
1037 value: serde_json::json!(42),
1038 })
1039 .await
1040 .unwrap();
1041
1042 tokio::time::timeout(std::time::Duration::from_secs(2), async {
1044 loop {
1045 match rx.recv().await {
1046 Ok(ev) if ev.key == "new_parent" => break ev,
1047 Err(e) => panic!("updates channel closed: {e}"),
1048 Ok(_) => continue,
1049 }
1050 }
1051 })
1052 .await
1053 .expect("timed out waiting for new_parent.new_child update");
1054
1055 assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1056 }
1057
1058 #[tokio::test]
1059 async fn test_underscore_fallback_on_get() {
1060 let (cfg, _) = ConfigurationLoader::for_tests(
1061 Some(serde_json::json!({})),
1062 Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1063 false,
1064 )
1065 .await;
1066 cfg.ready().await;
1067
1068 assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1069 }
1070
1071 #[tokio::test]
1072 async fn test_static_configuration_ready_and_subscribe() {
1073 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1074 assert!(maybe_sender.is_none());
1075
1076 tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1077 .await
1078 .expect("ready() should not block when dynamic is disabled");
1079
1080 assert!(cfg.subscribe_for_updates().is_none());
1081 }
1082
1083 #[tokio::test]
1084 async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1085 let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1087 assert!(maybe_sender.is_some());
1088
1089 let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1091 assert!(res.is_err(), "ready() should time out without an initial snapshot");
1092 }
1093}