Skip to main content

saluki_config/
lib.rs

1//! Primitives for working with typed and untyped configuration data.
2#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10    error::Kind,
11    providers::{Env, Serialized},
12    Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::Snafu;
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod duration_string;
21pub mod dynamic;
22mod provider;
23pub mod space_separated;
24
25pub use self::duration_string::{DurationString, ParseDurationError};
26pub use self::dynamic::FieldUpdateWatcher;
27use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
28use self::provider::ResolvedProvider;
29pub use self::space_separated::{deserialize_opt_space_separated_or_seq, deserialize_space_separated_or_seq};
30
31#[derive(Clone)]
32struct ArcProvider(Arc<dyn Provider + Send + Sync>);
33
34impl Provider for ArcProvider {
35    fn metadata(&self) -> figment::Metadata {
36        self.0.metadata()
37    }
38
39    fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
40        self.0.data()
41    }
42}
43
44enum ProviderSource {
45    Static(ArcProvider),
46    Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
47}
48
49impl Clone for ProviderSource {
50    fn clone(&self) -> Self {
51        match self {
52            Self::Static(p) => Self::Static(p.clone()),
53            Self::Dynamic(_) => Self::Dynamic(None),
54        }
55    }
56}
57
58/// A configuration error.
59#[derive(Debug, Snafu)]
60#[snafu(context(suffix(false)))]
61pub enum ConfigurationError {
62    /// Environment variable prefix was empty.
63    #[snafu(display("Environment variable prefix must not be empty."))]
64    EmptyPrefix,
65
66    /// Requested field was missing from the configuration.
67    #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
68    MissingField {
69        /// Help text describing how to set the missing field.
70        ///
71        /// This is meant to be displayed to the user, and includes environment variable-specific text if environment
72        /// variables had been loaded originally.
73        help_text: String,
74
75        /// Name of the missing field.
76        field: Cow<'static, str>,
77    },
78
79    /// Requested field's data type was not the unexpected data type.
80    #[snafu(display(
81        "Expected value for field '{}' to be '{}', got '{}' instead.",
82        field,
83        expected_ty,
84        actual_ty
85    ))]
86    InvalidFieldType {
87        /// Name of the invalid field.
88        ///
89        /// This is a period-separated path to the field.
90        field: String,
91
92        /// Expected data type.
93        expected_ty: String,
94
95        /// Actual data type.
96        actual_ty: String,
97    },
98
99    /// Generic configuration error.
100    #[snafu(transparent)]
101    Generic {
102        /// Error source.
103        source: GenericError,
104    },
105}
106
107impl From<figment::Error> for ConfigurationError {
108    fn from(e: figment::Error) -> Self {
109        match e.kind {
110            Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
111                field: e.path.join("."),
112                expected_ty,
113                actual_ty: actual_ty.to_string(),
114            },
115            _ => Self::Generic { source: e.into() },
116        }
117    }
118}
119
120#[derive(Clone, Debug, Eq, Hash, PartialEq)]
121enum LookupSource {
122    /// The configuration key is looked up in a form suitable for environment variables.
123    Environment { prefix: String },
124}
125
126impl LookupSource {
127    fn transform_key(&self, key: &str) -> String {
128        match self {
129            // The prefix should already be uppercased, with a trailing underscore, which is needed when we actually
130            // configure the provider used for reading from the environment... so we don't need to re-do that here.
131            LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
132        }
133    }
134}
135
136/// A configuration loader that can pull from various sources.
137///
138/// This loader provides a wrapper around a lower-level library, `figment`, to expose a simpler and focused API for both
139/// loading configuration data from various sources, as well as querying it.
140///
141/// A variety of configuration sources can be configured (see below), with an implicit priority based on the order in
142/// which sources are added: sources added later take precedence over sources prior. Additionally, either a typed value
143/// can be extracted from the configuration ([`into_typed`][Self::into_typed]), or the raw configuration data can be
144/// accessed via a generic API ([`into_generic`][Self::into_generic]).
145///
146/// # Supported sources
147///
148/// - YAML file
149/// - JSON file
150/// - environment variables (must be prefixed; see [`from_environment`][Self::from_environment])
151#[derive(Clone, Default)]
152pub struct ConfigurationLoader {
153    key_aliases: &'static [(&'static str, &'static str)],
154    lookup_sources: HashSet<LookupSource>,
155    provider_sources: Vec<ProviderSource>,
156}
157
158impl ConfigurationLoader {
159    /// Sets key aliases to apply when loading file-based configuration sources.
160    ///
161    /// Each entry is `(nested_path, flat_key)`. When a YAML or JSON file contains a value at `nested_path`
162    /// (dot-separated), that value is also emitted under `flat_key` at the top level — but only if `flat_key`
163    /// is not already explicitly set at the top level. This ensures that both YAML nested format and flat env var
164    /// format produce the same Figment key, so source precedence (env vars > file) works correctly.
165    ///
166    /// Must be called before any file-loading methods ([`from_yaml`][Self::from_yaml], etc.) to take effect.
167    pub fn with_key_aliases(mut self, aliases: &'static [(&'static str, &'static str)]) -> Self {
168        self.key_aliases = aliases;
169        self
170    }
171
172    /// Appends one or more providers to the configuration chain.
173    ///
174    /// Sources are merged in the order they are added: later sources take precedence over earlier ones. Call
175    /// this method after any file-loading methods and before [`from_environment`][Self::from_environment] to
176    /// place the added providers at the correct intermediate precedence level:
177    ///
178    /// ```text
179    /// file providers  <  add_providers(...)  <  from_environment(...)
180    /// ```
181    pub fn add_providers<P, I>(mut self, providers: I) -> Self
182    where
183        P: Provider + Send + Sync + 'static,
184        I: IntoIterator<Item = P>,
185    {
186        for p in providers {
187            self.provider_sources
188                .push(ProviderSource::Static(ArcProvider(Arc::new(p))));
189        }
190        self
191    }
192
193    /// Loads the given YAML configuration file.
194    ///
195    /// # Errors
196    ///
197    /// If the file could not be read, or if the file is not valid YAML, an error will be returned.
198    pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
199    where
200        P: AsRef<std::path::Path>,
201    {
202        let resolved_provider = ResolvedProvider::from_yaml(&path, self.key_aliases)?;
203        self.provider_sources
204            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
205        Ok(self)
206    }
207
208    /// Attempts to load the given YAML configuration file, ignoring any errors.
209    ///
210    /// Errors include the file not existing, not being readable/accessible, and not being valid YAML.
211    pub fn try_from_yaml<P>(mut self, path: P) -> Self
212    where
213        P: AsRef<std::path::Path>,
214    {
215        match ResolvedProvider::from_yaml(&path, self.key_aliases) {
216            Ok(resolved_provider) => {
217                self.provider_sources
218                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
219            }
220            Err(e) => {
221                println!(
222                    "Unable to read YAML configuration file '{}': {}. Ignoring.",
223                    path.as_ref().to_string_lossy(),
224                    e
225                );
226            }
227        }
228        self
229    }
230
231    /// Loads the given JSON configuration file.
232    ///
233    /// # Errors
234    ///
235    /// If the file could not be read, or if the file is not valid JSON, an error will be returned.
236    pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
237    where
238        P: AsRef<std::path::Path>,
239    {
240        let resolved_provider = ResolvedProvider::from_json(&path, self.key_aliases)?;
241        self.provider_sources
242            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
243        Ok(self)
244    }
245
246    /// Attempts to load the given JSON configuration file, ignoring any errors.
247    ///
248    /// Errors include the file not existing, not being readable/accessible, and not being valid JSON.
249    pub fn try_from_json<P>(mut self, path: P) -> Self
250    where
251        P: AsRef<std::path::Path>,
252    {
253        match ResolvedProvider::from_json(&path, self.key_aliases) {
254            Ok(resolved_provider) => {
255                self.provider_sources
256                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
257            }
258            Err(e) => {
259                println!(
260                    "Unable to read JSON configuration file '{}': {}. Ignoring.",
261                    path.as_ref().to_string_lossy(),
262                    e
263                );
264            }
265        }
266        self
267    }
268
269    /// Loads configuration from environment variables.
270    ///
271    /// The prefix given will have an underscore appended to it if it does not already end with one. For
272    /// example, with a prefix of `app`, any environment variable starting with `app_` would be matched. The
273    /// prefix is case-insensitive.
274    ///
275    /// Sources are merged in the order they are added, with later sources taking precedence over earlier ones.
276    /// Sources added after this call will have higher precedence than environment variables.
277    ///
278    /// # Errors
279    ///
280    /// If the prefix is empty, an error will be returned.
281    pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
282        if prefix.is_empty() {
283            return Err(ConfigurationError::EmptyPrefix);
284        }
285
286        let prefix = if prefix.ends_with('_') {
287            prefix.to_string()
288        } else {
289            format!("{}_", prefix)
290        };
291
292        // Convert to use Serialized::defaults since, Env isn't Send + Sync
293        let env = Env::prefixed(&prefix).split("__");
294        let values = env.data().unwrap();
295        if let Some(default_dict) = values.get(&figment::Profile::Default) {
296            self.provider_sources
297                .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
298                    default_dict.clone(),
299                )))));
300            self.lookup_sources.insert(LookupSource::Environment { prefix });
301        }
302        Ok(self)
303    }
304
305    /// Enables dynamic configuration.
306    ///
307    /// The receiver is used in `run_dynamic_config_updater` to handle retrieving the initial snapshot and subsequent updates.
308    pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
309        self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
310        self
311    }
312
313    /// Consumes the configuration loader, deserializing it as `T`.
314    ///
315    /// ## Errors
316    ///
317    /// If the configuration could not be deserialized into `T`, an error will be returned.
318    pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
319    where
320        T: Deserialize<'a>,
321    {
322        let figment = build_figment_from_sources(&self.provider_sources);
323        figment.extract().map_err(Into::into)
324    }
325
326    /// Creates a bootstrap `GenericConfiguration` without consuming the loader.
327    ///
328    /// This creates a static snapshot of the configuration loaded so far. As this is intended for bootstrapping
329    /// before dynamic configuration is active, the dynamic provider is ignored.
330    pub fn bootstrap_generic(&self) -> GenericConfiguration {
331        let figment = build_figment_from_sources(&self.provider_sources);
332
333        GenericConfiguration {
334            inner: Arc::new(Inner {
335                figment: RwLock::new(figment),
336                lookup_sources: self.lookup_sources.clone(),
337                event_sender: None,
338                ready_signal: Mutex::new(None),
339            }),
340        }
341    }
342
343    /// Consumes the configuration loader and wraps it in a generic wrapper.
344    pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
345        let has_dynamic_provider = self
346            .provider_sources
347            .iter()
348            .any(|s| matches!(s, ProviderSource::Dynamic(_)));
349
350        if has_dynamic_provider {
351            let mut receiver_opt = None;
352            for source in self.provider_sources.iter_mut() {
353                if let ProviderSource::Dynamic(ref mut receiver) = source {
354                    receiver_opt = receiver.take();
355                    break;
356                }
357            }
358            let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
359
360            // Build the initial figment object from the static providers. The dynamic provider is empty for now.
361            let figment = build_figment_from_sources(&self.provider_sources);
362
363            let (event_sender, _) = broadcast::channel(100);
364            let (ready_sender, ready_receiver) = oneshot::channel();
365
366            let generic_config = GenericConfiguration {
367                inner: Arc::new(Inner {
368                    figment: RwLock::new(figment),
369                    lookup_sources: self.lookup_sources,
370                    event_sender: Some(event_sender.clone()),
371                    ready_signal: Mutex::new(Some(ready_receiver)),
372                }),
373            };
374
375            // Spawn the background task to handle retrieving the initial snapshot and subsequent updates.
376            tokio::spawn(run_dynamic_config_updater(
377                generic_config.inner.clone(),
378                receiver,
379                self.provider_sources,
380                event_sender,
381                ready_sender,
382            ));
383
384            Ok(generic_config)
385        } else {
386            // Otherwise, just build the static configuration.
387            let figment = build_figment_from_sources(&self.provider_sources);
388
389            Ok(GenericConfiguration {
390                inner: Arc::new(Inner {
391                    figment: RwLock::new(figment),
392                    lookup_sources: self.lookup_sources,
393                    event_sender: None,
394                    ready_signal: Mutex::new(None),
395                }),
396            })
397        }
398    }
399
400    /// Configures a [`GenericConfiguration`] that is suitable for tests.
401    ///
402    /// This configures the loader with the following defaults:
403    ///
404    /// - configuration from a JSON file
405    /// - configuration from environment variables
406    ///
407    /// If `enable_dynamic_configuration` is true, a dynamic configuration sender is returned.
408    ///
409    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate testing scenarios.
410    pub async fn for_tests(
411        file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
412        enable_dynamic_configuration: bool,
413    ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
414        Self::for_tests_with_provider_factory(file_values, env_vars, enable_dynamic_configuration, &[], || {
415            Serialized::defaults(serde_json::json!({}))
416        })
417        .await
418    }
419
420    /// Like [`for_tests`][Self::for_tests], but applies `key_aliases` during file loading and calls
421    /// `provider_factory` to build an additional provider inserted between the file provider and the
422    /// environment provider.
423    ///
424    /// The factory is called after test environment variables have been set, so any env var reads it performs
425    /// (e.g. in `DatadogRemapper`) are consistent with the test's env setup.
426    ///
427    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate testing scenarios.
428    pub async fn for_tests_with_provider_factory<P, F>(
429        file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
430        enable_dynamic_configuration: bool, key_aliases: &'static [(&'static str, &'static str)], provider_factory: F,
431    ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>)
432    where
433        P: Provider + Send + Sync + 'static,
434        F: FnOnce() -> P,
435    {
436        let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
437        let path = &json_file.path();
438        let json_to_write = file_values.unwrap_or(serde_json::json!({}));
439        serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
440
441        let mut loader = ConfigurationLoader::default()
442            .with_key_aliases(key_aliases)
443            .try_from_json(path);
444        let mut maybe_sender = None;
445        if enable_dynamic_configuration {
446            let (sender, receiver) = tokio::sync::mpsc::channel(1);
447            loader = loader.with_dynamic_configuration(receiver);
448            maybe_sender = Some(sender);
449        }
450
451        static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
452
453        let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
454
455        if let Some(pairs) = env_vars.as_ref() {
456            for (k, v) in pairs.iter() {
457                // Set under both the raw name and the TEST_ prefix:
458                //   - Raw name: available to any env-reading providers (e.g. DatadogRemapper)
459                //   - TEST_ prefix: read by from_environment("TEST") (simulates DD_ prefix)
460                std::env::set_var(k, v);
461                std::env::set_var(format!("TEST_{}", k), v);
462            }
463        }
464
465        // Build and insert the extra provider while env vars are set so it can snapshot them.
466        let loader = loader.add_providers([provider_factory()]);
467
468        // Add environment provider last so it has the highest precedence.
469        let loader = loader
470            .from_environment("TEST")
471            .expect("should not fail to add environment provider");
472
473        // Clean up test-provided env vars now that all providers have been built.
474        if let Some(pairs) = env_vars.as_ref() {
475            for (k, _) in pairs.iter() {
476                std::env::remove_var(k);
477                std::env::remove_var(format!("TEST_{}", k));
478            }
479        }
480
481        drop(guard);
482
483        let cfg = loader
484            .into_generic()
485            .await
486            .expect("should not fail to build generic configuration");
487
488        (cfg, maybe_sender)
489    }
490}
491
492fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
493    sources.iter().fold(Figment::new(), |figment, source| match source {
494        ProviderSource::Static(p) => figment.admerge(p.clone()),
495        // No-op. The merging is handled by the updater task.
496        ProviderSource::Dynamic(_) => figment,
497    })
498}
499
500/// Inserts or updates a value for a key.
501///
502/// Intermediate objects are created if they don't exist.
503pub fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
504    if !root.is_object() {
505        *root = serde_json::Value::Object(serde_json::Map::new());
506    }
507
508    let mut current = root;
509    // Create a new node for each segment if the key is dotted.
510    let mut segments = key.split('.').peekable();
511
512    while let Some(seg) = segments.next() {
513        let is_leaf = segments.peek().is_none();
514
515        // Ensure current is an object before operating
516        if !current.is_object() {
517            *current = serde_json::Value::Object(serde_json::Map::new());
518        }
519        let node = current.as_object_mut().expect("current node should be an object");
520
521        if is_leaf {
522            node.insert(seg.to_string(), value);
523            break;
524        } else {
525            // Ensure child exists and is an object
526            let should_create_node = match node.get(seg) {
527                Some(v) => !v.is_object(),
528                None => true,
529            };
530            // Check if we need to create an intermediate node if it doesn't exist.
531            if should_create_node {
532                node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
533            }
534
535            // Advance the current node to the next level.
536            current = node.get_mut(seg).expect("should not fail to get nested object");
537        }
538    }
539}
540
541async fn run_dynamic_config_updater(
542    inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
543    sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
544) {
545    // The first message on the channel will be the initial snapshot.
546    let initial_update = match receiver.recv().await {
547        Some(update) => update,
548        None => {
549            // The channel was closed before we even received the initial snapshot.
550            debug!("Dynamic configuration channel closed before initial snapshot.");
551            return;
552        }
553    };
554
555    let mut dynamic_state = match initial_update {
556        ConfigUpdate::Snapshot(state) => state,
557        ConfigUpdate::Partial { .. } => {
558            // This is theoretically unreachable, as `configstream` should always send a snapshot first.
559            error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
560            serde_json::Value::Null
561        }
562    };
563
564    // Rebuild the configuration with the initial snapshot.
565    let new_figment = provider_sources
566        .iter()
567        .fold(Figment::new(), |figment, source| match source {
568            ProviderSource::Static(p) => figment.admerge(p.clone()),
569            ProviderSource::Dynamic(_) => {
570                figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
571            }
572        });
573
574    // Update the main figment object and then release the lock.
575    {
576        let mut figment_guard = inner.figment.write().unwrap();
577        *figment_guard = new_figment.clone();
578    }
579
580    // Signal that the initial snapshot has been processed and the configuration is ready.
581    if ready_sender.send(()).is_err() {
582        debug!("Configuration readiness receiver dropped. Updater task shutting down.");
583        return;
584    }
585
586    // Set our "current" state for the main loop.
587    let mut current_config: figment::value::Value = new_figment.extract().unwrap();
588
589    // Enter the main loop to process subsequent updates.
590    loop {
591        let update = match receiver.recv().await {
592            Some(update) => update,
593            None => {
594                // The sender was dropped, which means the config stream has terminated. We can exit.
595                debug!("Dynamic configuration update channel closed. Updater task shutting down.");
596                return;
597            }
598        };
599
600        // Update our local dynamic state based on the received message.
601        match update {
602            ConfigUpdate::Snapshot(new_state) => {
603                debug!("Received configuration snapshot update.");
604                dynamic_state = new_state;
605            }
606            ConfigUpdate::Partial { key, value } => {
607                debug!(%key, "Received partial configuration update.");
608                if dynamic_state.is_null() {
609                    dynamic_state = serde_json::Value::Object(serde_json::Map::new());
610                }
611                if dynamic_state.is_object() {
612                    upsert(&mut dynamic_state, &key, value);
613                } else {
614                    error!(
615                        "Received partial update but current dynamic state is not an object. This should not happen."
616                    );
617                }
618            }
619        }
620
621        // Rebuild the figment object on every update, respecting the original provider order.
622        let new_figment = provider_sources
623            .iter()
624            .fold(Figment::new(), |figment, source| match source {
625                ProviderSource::Static(p) => figment.admerge(p.clone()),
626                ProviderSource::Dynamic(_) => {
627                    figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
628                }
629            });
630
631        let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
632
633        if current_config != new_config {
634            for change in dynamic::diff_config(&current_config, &new_config) {
635                // Send the change event to any receivers of the dynamic handler.
636                // If there are no receivers, `send` will fail. This is expected and fine,
637                // so we can ignore the error to avoid log spam.
638                let _ = sender.send(change);
639            }
640
641            let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
642                error!("Failed to acquire write lock for dynamic configuration: {}", e);
643                e.into_inner()
644            });
645            *figment_guard = new_figment;
646
647            // Update our "current" state for the next iteration.
648            current_config = new_config;
649        }
650    }
651}
652
653#[derive(Debug)]
654struct Inner {
655    figment: RwLock<Figment>,
656    lookup_sources: HashSet<LookupSource>,
657    event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
658    ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
659}
660
661/// A generic configuration object.
662///
663/// This represents the merged configuration derived from [`ConfigurationLoader`] in its raw form.  Values can be
664/// queried by key, and can be extracted either as typed values or in their raw form.
665///
666/// Keys must be in the form of `a.b.c`, where periods (`.`) as used to indicate a nested value.
667///
668/// Using an example JSON configuration:
669///
670/// ```json
671/// {
672///   "a": {
673///     "b": {
674///       "c": "value"
675///     }
676///   }
677/// }
678/// ```
679///
680/// Querying for the value of `a.b.c` would return `"value"`, and querying for `a.b` would return the nested object `{
681/// "c": "value" }`.
682#[derive(Clone, Debug)]
683pub struct GenericConfiguration {
684    inner: Arc<Inner>,
685}
686
687impl GenericConfiguration {
688    /// Waits for the configuration to be ready, if dynamic configuration is enabled.
689    ///
690    /// If dynamic configuration is in use, this method will asynchronously wait until the first snapshot has been
691    /// received and applied.
692    ///
693    /// If dynamic configuration is not used, it returns immediately.
694    pub async fn ready(&self) {
695        // We need a lock to both ensure that multiple callers can race against this,
696        // and to allow us mutable access to consume the receiver.
697        let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
698        if let Some(ready_rx) = maybe_ready_rx.take() {
699            // We're the first caller to wait for readiness.
700            if ready_rx.await.is_err() {
701                error!("Failed to receive configuration readiness signal; updater task may have panicked.");
702            }
703        }
704    }
705
706    fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
707    where
708        T: Deserialize<'a>,
709    {
710        let figment_guard = self.inner.figment.read().unwrap();
711        match figment_guard.extract_inner(key) {
712            Ok(value) => Ok(value),
713            Err(e) => {
714                if matches!(e.kind, figment::error::Kind::MissingField(_)) {
715                    // We might have been given a key that uses nested notation -- `foo.bar` -- but is only present in the
716                    // environment variables. We specifically don't want to use a different separator in environment
717                    // variables to map to nested key separators, so we simply try again here but with all nested key
718                    // separators (`.`) replaced with `_`, to match environment variables.
719                    let fallback_key = key.replace('.', "_");
720                    figment_guard
721                        .extract_inner(&fallback_key)
722                        .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
723                } else {
724                    Err(e.into())
725                }
726            }
727        }
728    }
729
730    /// Gets a configuration value by key.
731    ///
732    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
733    ///
734    /// ## Errors
735    ///
736    /// If the key does not exist in the configuration, or if the value could not be deserialized into `T`, an error
737    /// variant will be returned.
738    pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
739    where
740        T: Deserialize<'a>,
741    {
742        self.get(key)
743    }
744
745    /// Gets a configuration value by key, or the default value if a key does not exist or could not be deserialized.
746    ///
747    /// The `Default` implementation of `T` will be used both if the key could not be found, as well as for any error
748    /// during deserialization. This effectively swallows any errors and should generally be used sparingly.
749    ///
750    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
751    pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
752    where
753        T: Default + Deserialize<'a>,
754    {
755        self.get(key).unwrap_or_default()
756    }
757
758    /// Gets a configuration value by key, if it exists.
759    ///
760    /// If the key exists in the configuration, and can be deserialized, `Ok(Some(value))` is returned. Otherwise,
761    /// `Ok(None)` will be returned.
762    ///
763    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
764    ///
765    /// ## Errors
766    ///
767    /// If the value could not be deserialized into `T`, an error will be returned.
768    pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
769    where
770        T: Deserialize<'a>,
771    {
772        match self.get(key) {
773            Ok(value) => Ok(Some(value)),
774            Err(ConfigurationError::MissingField { .. }) => Ok(None),
775            Err(e) => Err(e),
776        }
777    }
778
779    /// Attempts to deserialize the entire configuration as `T`.
780    ///
781    /// ## Errors
782    ///
783    /// If the value could not be deserialized into `T`, an error will be returned.
784    pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
785    where
786        T: Deserialize<'a>,
787    {
788        self.inner
789            .figment
790            .read()
791            .unwrap()
792            .extract()
793            .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
794    }
795
796    /// Subscribes for updates to the configuration.
797    pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
798        self.inner.event_sender.as_ref().map(|s| s.subscribe())
799    }
800
801    /// Creates a watcher that yields only when the given key changes.
802    ///
803    /// If dynamic configuration is disabled, the returned watcher's `changed()`
804    /// will wait indefinitely.
805    pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
806        FieldUpdateWatcher {
807            key: key.to_string(),
808            rx: self.subscribe_for_updates(),
809        }
810    }
811}
812
813fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
814    match e.kind {
815        Kind::MissingField(field) => {
816            let mut valid_keys = lookup_sources
817                .iter()
818                .map(|source| source.transform_key(&field))
819                .collect::<Vec<_>>();
820
821            // Always specify the original key as a valid key to try.
822            valid_keys.insert(0, field.to_string());
823
824            let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
825
826            ConfigurationError::MissingField { help_text, field }
827        }
828        Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
829            field: e.path.join("."),
830            expected_ty,
831            actual_ty: actual_ty.to_string(),
832        },
833        _ => ConfigurationError::Generic { source: e.into() },
834    }
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840
841    #[tokio::test]
842    async fn test_static_configuration() {
843        let (cfg, _) = ConfigurationLoader::for_tests(
844            Some(serde_json::json!({
845                "foo": "bar",
846                "baz": 5,
847                "foobar": { "a": false, "b": "c" }
848            })),
849            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
850            false,
851        )
852        .await;
853        cfg.ready().await;
854
855        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
856        assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
857        assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
858        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
859        assert!(matches!(
860            cfg.get::<String>("nonexistentKey"),
861            Err(ConfigurationError::MissingField { .. })
862        ));
863    }
864
865    #[tokio::test]
866    async fn test_dynamic_configuration() {
867        let (cfg, sender) = ConfigurationLoader::for_tests(
868            Some(serde_json::json!({
869                "foo": "bar",
870                "baz": 5,
871                "foobar": { "a": false, "b": "c" }
872            })),
873            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
874            true,
875        )
876        .await;
877        let sender = sender.expect("sender should exist");
878        sender
879            .send(ConfigUpdate::Snapshot(serde_json::json!({
880                "new": "from_snapshot",
881            })))
882            .await
883            .unwrap();
884
885        cfg.ready().await;
886
887        // Test that existing values still exist.
888        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
889
890        // Test that new values from the snapshot exist.
891        assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
892
893        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
894
895        sender
896            .send(ConfigUpdate::Partial {
897                key: "new_key".to_string(),
898                value: "from dynamic update".to_string().into(),
899            })
900            .await
901            .unwrap();
902
903        tokio::time::timeout(std::time::Duration::from_secs(2), async {
904            loop {
905                match rx.recv().await {
906                    Ok(ev) if ev.key == "new_key" => break ev,
907                    Err(e) => panic!("updates channel closed: {e}"),
908                    Ok(_) => continue,
909                }
910            }
911        })
912        .await
913        .expect("timed out waiting for new_key update");
914
915        assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
916
917        // Test that an update with a nested key is applied.
918        sender
919            .send(ConfigUpdate::Partial {
920                key: "foobar.a".to_string(),
921                value: serde_json::json!(true),
922            })
923            .await
924            .unwrap();
925
926        tokio::time::timeout(std::time::Duration::from_secs(2), async {
927            loop {
928                match rx.recv().await {
929                    Ok(ev) if ev.key == "foobar.a" => break ev,
930                    Err(e) => panic!("updates channel closed: {e}"),
931                    Ok(_) => continue,
932                }
933            }
934        })
935        .await
936        .expect("timed out waiting for foobar.a update");
937
938        assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
939        assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
940    }
941
942    #[tokio::test]
943    async fn test_environment_precedence_over_dynamic() {
944        let (cfg, sender) = ConfigurationLoader::for_tests(
945            Some(serde_json::json!({
946                "foo": "bar",
947                "baz": 5,
948                "foobar": { "a": false, "b": "c" }
949            })),
950            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
951            true,
952        )
953        .await;
954        let sender = sender.expect("sender should exist");
955
956        sender
957            .send(ConfigUpdate::Snapshot(serde_json::json!({
958                "env_var": "from_snapshot_env_var"
959            })))
960            .await
961            .unwrap();
962
963        cfg.ready().await;
964
965        // Env provider has highest precedence so the snapshot should not override it.
966        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
967
968        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
969
970        // Send a partial update that attempts to override the env-backed key.
971        sender
972            .send(ConfigUpdate::Partial {
973                key: "env_var".to_string(),
974                value: serde_json::json!("from_partial"),
975            })
976            .await
977            .unwrap();
978
979        // Also attempt to override the nested env-backed key via dynamic.
980        sender
981            .send(ConfigUpdate::Partial {
982                key: "foobar.a".to_string(),
983                value: serde_json::json!(false),
984            })
985            .await
986            .unwrap();
987
988        // Send a dummy partial update to ensure the updater has processed prior partials.
989        sender
990            .send(ConfigUpdate::Partial {
991                key: "dummy".to_string(),
992                value: serde_json::json!(1),
993            })
994            .await
995            .unwrap();
996
997        tokio::time::timeout(std::time::Duration::from_secs(2), async {
998            loop {
999                match rx.recv().await {
1000                    Ok(ev) if ev.key == "dummy" => break,
1001                    Err(e) => panic!("updates channel closed: {e}"),
1002                    Ok(_) => continue,
1003                }
1004            }
1005        })
1006        .await
1007        .expect("timed out waiting for sync marker");
1008
1009        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1010    }
1011
1012    #[tokio::test]
1013    async fn test_dynamic_configuration_add_new_nested_key() {
1014        let (cfg, sender) = ConfigurationLoader::for_tests(
1015            Some(serde_json::json!({
1016                "foo": "bar",
1017                "baz": 5,
1018                "foobar": { "a": false, "b": "c" }
1019            })),
1020            None,
1021            true,
1022        )
1023        .await;
1024        let sender = sender.expect("sender should exist");
1025
1026        sender
1027            .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1028            .await
1029            .unwrap();
1030        cfg.ready().await;
1031
1032        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1033
1034        sender
1035            .send(ConfigUpdate::Partial {
1036                key: "new_parent.new_child".to_string(),
1037                value: serde_json::json!(42),
1038            })
1039            .await
1040            .unwrap();
1041
1042        // new_parent object did not exist before, so the diff will emit the object "new_parent"
1043        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1044            loop {
1045                match rx.recv().await {
1046                    Ok(ev) if ev.key == "new_parent" => break ev,
1047                    Err(e) => panic!("updates channel closed: {e}"),
1048                    Ok(_) => continue,
1049                }
1050            }
1051        })
1052        .await
1053        .expect("timed out waiting for new_parent.new_child update");
1054
1055        assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1056    }
1057
1058    #[tokio::test]
1059    async fn test_underscore_fallback_on_get() {
1060        let (cfg, _) = ConfigurationLoader::for_tests(
1061            Some(serde_json::json!({})),
1062            Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1063            false,
1064        )
1065        .await;
1066        cfg.ready().await;
1067
1068        assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1069    }
1070
1071    #[tokio::test]
1072    async fn test_static_configuration_ready_and_subscribe() {
1073        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1074        assert!(maybe_sender.is_none());
1075
1076        tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1077            .await
1078            .expect("ready() should not block when dynamic is disabled");
1079
1080        assert!(cfg.subscribe_for_updates().is_none());
1081    }
1082
1083    #[tokio::test]
1084    async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1085        // Enable dynamic but do not send the initial snapshot.
1086        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1087        assert!(maybe_sender.is_some());
1088
1089        // ready() should not resolve until the initial snapshot is processed.
1090        let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1091        assert!(res.is_err(), "ready() should time out without an initial snapshot");
1092    }
1093}