saluki_config/
lib.rs

1//! Primitives for working with typed and untyped configuration data.
2#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10    error::Kind,
11    providers::{Env, Serialized},
12    Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::{ResultExt as _, Snafu};
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod dynamic;
21mod provider;
22mod secrets;
23
24pub use self::dynamic::FieldUpdateWatcher;
25use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
26use self::provider::ResolvedProvider;
27
28#[derive(Clone)]
29struct ArcProvider(Arc<dyn Provider + Send + Sync>);
30
31impl Provider for ArcProvider {
32    fn metadata(&self) -> figment::Metadata {
33        self.0.metadata()
34    }
35
36    fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
37        self.0.data()
38    }
39}
40
41enum ProviderSource {
42    Static(ArcProvider),
43    Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
44}
45
46impl Clone for ProviderSource {
47    fn clone(&self) -> Self {
48        match self {
49            Self::Static(p) => Self::Static(p.clone()),
50            Self::Dynamic(_) => Self::Dynamic(None),
51        }
52    }
53}
54
55/// A configuration error.
56#[derive(Debug, Snafu)]
57#[snafu(context(suffix(false)))]
58pub enum ConfigurationError {
59    /// Environment variable prefix was empty.
60    #[snafu(display("Environment variable prefix must not be empty."))]
61    EmptyPrefix,
62
63    /// Requested field was missing from the configuration.
64    #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
65    MissingField {
66        /// Help text describing how to set the missing field.
67        ///
68        /// This is meant to be displayed to the user, and includes environment variable-specific text if environment
69        /// variables had been loaded originally.
70        help_text: String,
71
72        /// Name of the missing field.
73        field: Cow<'static, str>,
74    },
75
76    /// Requested field's data type was not the unexpected data type.
77    #[snafu(display(
78        "Expected value for field '{}' to be '{}', got '{}' instead.",
79        field,
80        expected_ty,
81        actual_ty
82    ))]
83    InvalidFieldType {
84        /// Name of the invalid field.
85        ///
86        /// This is a period-separated path to the field.
87        field: String,
88
89        /// Expected data type.
90        expected_ty: String,
91
92        /// Actual data type.
93        actual_ty: String,
94    },
95
96    /// Generic configuration error.
97    #[snafu(display("Failed to query configuration."))]
98    Generic {
99        /// Error source.
100        source: GenericError,
101    },
102
103    /// Secrets resolution error.
104    #[snafu(display("Failed to resolve secrets."))]
105    Secrets {
106        /// Error source.
107        source: secrets::Error,
108    },
109}
110
111impl From<figment::Error> for ConfigurationError {
112    fn from(e: figment::Error) -> Self {
113        match e.kind {
114            Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
115                field: e.path.join("."),
116                expected_ty,
117                actual_ty: actual_ty.to_string(),
118            },
119            _ => Self::Generic { source: e.into() },
120        }
121    }
122}
123
124#[derive(Clone, Debug, Eq, Hash, PartialEq)]
125enum LookupSource {
126    /// The configuration key is looked up in a form suitable for environment variables.
127    Environment { prefix: String },
128}
129
130impl LookupSource {
131    fn transform_key(&self, key: &str) -> String {
132        match self {
133            // The prefix should already be uppercased, with a trailing underscore, which is needed when we actually
134            // configure the provider used for reading from the environment... so we don't need to re-do that here.
135            LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
136        }
137    }
138}
139
140/// A configuration loader that can pull from various sources.
141///
142/// This loader provides a wrapper around a lower-level library, `figment`, to expose a simpler and focused API for both
143/// loading configuration data from various sources, as well as querying it.
144///
145/// A variety of configuration sources can be configured (see below), with an implicit priority based on the order in
146/// which sources are added: sources added later take precedence over sources prior. Additionally, either a typed value
147/// can be extracted from the configuration ([`into_typed`][Self::into_typed]), or the raw configuration data can be
148/// accessed via a generic API ([`into_generic`][Self::into_generic]).
149///
150/// # Supported sources
151///
152/// - YAML file
153/// - JSON file
154/// - environment variables (must be prefixed; see [`from_environment`][Self::from_environment])
155#[derive(Clone, Default)]
156pub struct ConfigurationLoader {
157    lookup_sources: HashSet<LookupSource>,
158    provider_sources: Vec<ProviderSource>,
159}
160
161impl ConfigurationLoader {
162    /// Loads the given YAML configuration file.
163    ///
164    /// # Errors
165    ///
166    /// If the file could not be read, or if the file is not valid YAML, an error will be returned.
167    pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
168    where
169        P: AsRef<std::path::Path>,
170    {
171        let resolved_provider = ResolvedProvider::from_yaml(&path).context(Generic)?;
172        self.provider_sources
173            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
174        Ok(self)
175    }
176
177    /// Attempts to load the given YAML configuration file, ignoring any errors.
178    ///
179    /// Errors include the file not existing, not being readable/accessible, and not being valid YAML.
180    pub fn try_from_yaml<P>(mut self, path: P) -> Self
181    where
182        P: AsRef<std::path::Path>,
183    {
184        match ResolvedProvider::from_yaml(&path) {
185            Ok(resolved_provider) => {
186                self.provider_sources
187                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
188            }
189            Err(e) => {
190                println!(
191                    "Unable to read YAML configuration file '{}': {}. Ignoring.",
192                    path.as_ref().to_string_lossy(),
193                    e
194                );
195            }
196        }
197        self
198    }
199
200    /// Loads the given JSON configuration file.
201    ///
202    /// # Errors
203    ///
204    /// If the file could not be read, or if the file is not valid JSON, an error will be returned.
205    pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
206    where
207        P: AsRef<std::path::Path>,
208    {
209        let resolved_provider = ResolvedProvider::from_json(&path).context(Generic)?;
210        self.provider_sources
211            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
212        Ok(self)
213    }
214
215    /// Attempts to load the given JSON configuration file, ignoring any errors.
216    ///
217    /// Errors include the file not existing, not being readable/accessible, and not being valid JSON.
218    pub fn try_from_json<P>(mut self, path: P) -> Self
219    where
220        P: AsRef<std::path::Path>,
221    {
222        match ResolvedProvider::from_json(&path) {
223            Ok(resolved_provider) => {
224                self.provider_sources
225                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
226            }
227            Err(e) => {
228                println!(
229                    "Unable to read JSON configuration file '{}': {}. Ignoring.",
230                    path.as_ref().to_string_lossy(),
231                    e
232                );
233            }
234        }
235        self
236    }
237
238    /// Loads configuration from environment variables.
239    ///
240    /// The prefix given will have an underscore appended to it if it does not already end with one. For example, with a
241    /// prefix of `app`, any environment variable starting with `app_` would be matched.
242    ///
243    /// The prefix is case-insensitive.
244    ///
245    /// # Errors
246    ///
247    /// If the prefix is empty, an error will be returned.
248    pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
249        if prefix.is_empty() {
250            return Err(ConfigurationError::EmptyPrefix);
251        }
252
253        let prefix = if prefix.ends_with('_') {
254            prefix.to_string()
255        } else {
256            format!("{}_", prefix)
257        };
258
259        // Convert to use Serialized::defaults since, Env isn't Send + Sync
260        let env = Env::prefixed(&prefix);
261        let values = env.data().unwrap();
262        if let Some(default_dict) = values.get(&figment::Profile::Default) {
263            self.provider_sources
264                .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
265                    default_dict.clone(),
266                )))));
267            self.lookup_sources.insert(LookupSource::Environment { prefix });
268        }
269        Ok(self)
270    }
271
272    /// Resolves secrets in the configuration based on available secret backend configuration.
273    ///
274    /// This will attempt to resolve any secret references (format shown below) in the configuration by using a "secrets
275    /// backend", which is a user-provided command that utilizes a simple JSON-based protocol to accept secrets to
276    /// resolve, and return those resolved secrets, or the errors that occurred during resolving.
277    ///
278    /// # Configuration
279    ///
280    /// This method uses the existing configuration (see Caveats) to determine the secrets backend configuration. The
281    /// following configuration settings are used:
282    ///
283    /// - `secret_backend_command`: The executable to resolve secrets. (required)
284    /// - `secret_backend_timeout`: The timeout for the secrets backend command, in seconds. (optional, default: 30)
285    ///
286    /// # Usage
287    ///
288    /// For any value which should be resolved as a secret, the value should be a string in the format of
289    /// `ENC[secret_reference]`. The `secret_reference` portion is the value that will be sent to the backend command
290    /// during resolution. There is no limitation on the format of the `secret_reference` value, so long as it can be
291    /// expressed through the existing configuration sources (YAML, environment variables, etc).
292    ///
293    /// The entire configuration value must match this pattern, and cannot be used to replace only part of a value, so
294    /// values such as `db-ENC[secret_reference]` would not be detected as secrets and thus would not be resolved.
295    ///
296    /// # Protocol
297    ///
298    /// The executable is expected to accept a JSON object on stdin, with the following format:
299    ///
300    /// ```json
301    /// {
302    ///   "version": "1.0",
303    ///   "secrets": ["key1", "key2"]
304    /// }
305    /// ```
306    ///
307    /// The executable is expected return a JSON object on stdout, with the following format:
308    /// ```json
309    /// {
310    ///   "key1": {
311    ///     "value": "my_secret_password",
312    ///     "error": null
313    ///   },
314    ///   "key2": {
315    ///     "value": null,
316    ///     "error": "could not fetch the secret"
317    ///   }
318    /// }
319    /// ```
320    ///
321    /// If any entry in the response has an `error` value that is anything but `null`, the overall resolution will be
322    /// considered failed.
323    ///
324    /// # Caveats
325    ///
326    /// ## Time of resolution
327    ///
328    /// Secrets resolution happens at the time this method is called, and only resolves configuration values that are
329    /// already present in the configuration, which means all calls to load configuration (`try_from_yaml`,
330    /// `from_environment`, etc) must be made before calling this method.
331    ///
332    /// ## Sensitive data in error output
333    ///
334    /// Care should be taken to not return sensitive information in either the error output (standard error) of the
335    /// backend command or the `error` field in the JSON response, as these values are logged in order to aid debugging.
336    pub async fn with_default_secrets_resolution(mut self) -> Result<Self, ConfigurationError> {
337        let initial_figment = build_figment_from_sources(&self.provider_sources);
338
339        // If no secrets backend is set, we can't resolve secrets, so just return early.
340        if !initial_figment.contains("secret_backend_command") {
341            debug!("No secrets backend configured; skipping secrets resolution.");
342            return Ok(self);
343        }
344
345        let resolver_config = initial_figment.extract::<secrets::resolver::ExternalProcessResolverConfiguration>()?;
346        let resolver = secrets::resolver::ExternalProcessResolver::from_configuration(resolver_config)
347            .await
348            .context(Secrets)?;
349
350        let provider = secrets::Provider::new(resolver, &initial_figment)
351            .await
352            .context(Secrets)?;
353
354        self.provider_sources
355            .push(ProviderSource::Static(ArcProvider(Arc::new(provider))));
356        Ok(self)
357    }
358
359    /// Enables dynamic configuration.
360    ///
361    /// The receiver is used in `run_dynamic_config_updater` to handle retrieving the initial snapshot and subsequent updates.
362    pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
363        self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
364        self
365    }
366
367    /// Consumes the configuration loader, deserializing it as `T`.
368    ///
369    /// ## Errors
370    ///
371    /// If the configuration could not be deserialized into `T`, an error will be returned.
372    pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
373    where
374        T: Deserialize<'a>,
375    {
376        let figment = build_figment_from_sources(&self.provider_sources);
377        figment.extract().map_err(Into::into)
378    }
379
380    /// Creates a bootstrap `GenericConfiguration` without consuming the loader.
381    ///
382    /// This creates a static snapshot of the configuration loaded so far. As this is intended for bootstrapping
383    /// before dynamic configuration is active, the dynamic provider is ignored.
384    pub fn bootstrap_generic(&self) -> GenericConfiguration {
385        let figment = build_figment_from_sources(&self.provider_sources);
386
387        GenericConfiguration {
388            inner: Arc::new(Inner {
389                figment: RwLock::new(figment),
390                lookup_sources: self.lookup_sources.clone(),
391                event_sender: None,
392                ready_signal: Mutex::new(None),
393            }),
394        }
395    }
396
397    /// Consumes the configuration loader and wraps it in a generic wrapper.
398    pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
399        let has_dynamic_provider = self
400            .provider_sources
401            .iter()
402            .any(|s| matches!(s, ProviderSource::Dynamic(_)));
403
404        if has_dynamic_provider {
405            let mut receiver_opt = None;
406            for source in self.provider_sources.iter_mut() {
407                if let ProviderSource::Dynamic(ref mut receiver) = source {
408                    receiver_opt = receiver.take();
409                    break;
410                }
411            }
412            let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
413
414            // Build the initial figment object from the static providers. The dynamic provider is empty for now.
415            let figment = build_figment_from_sources(&self.provider_sources);
416
417            let (event_sender, _) = broadcast::channel(100);
418            let (ready_sender, ready_receiver) = oneshot::channel();
419
420            let generic_config = GenericConfiguration {
421                inner: Arc::new(Inner {
422                    figment: RwLock::new(figment),
423                    lookup_sources: self.lookup_sources,
424                    event_sender: Some(event_sender.clone()),
425                    ready_signal: Mutex::new(Some(ready_receiver)),
426                }),
427            };
428
429            // Spawn the background task to handle retrieving the initial snapshot and subsequent updates.
430            tokio::spawn(run_dynamic_config_updater(
431                generic_config.inner.clone(),
432                receiver,
433                self.provider_sources,
434                event_sender,
435                ready_sender,
436            ));
437
438            Ok(generic_config)
439        } else {
440            // Otherwise, just build the static configuration.
441            let figment = build_figment_from_sources(&self.provider_sources);
442
443            Ok(GenericConfiguration {
444                inner: Arc::new(Inner {
445                    figment: RwLock::new(figment),
446                    lookup_sources: self.lookup_sources,
447                    event_sender: None,
448                    ready_signal: Mutex::new(None),
449                }),
450            })
451        }
452    }
453
454    /// Configures a [`GenericConfiguration`] that is suitable for tests.
455    ///
456    /// This configures the loader with the following defaults:
457    ///
458    /// - configuration from a JSON file
459    /// - configuration from environment variables
460    ///
461    /// If `enable_dynamic_configuration` is true, a dynamic configuration sender is returned.
462    ///
463    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate testing scenarios.
464    pub async fn for_tests(
465        file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
466        enable_dynamic_configuration: bool,
467    ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
468        let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
469        let path = &json_file.path();
470        let json_to_write = file_values.unwrap_or(serde_json::json!({}));
471        serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
472
473        let mut loader = ConfigurationLoader::default().try_from_json(path);
474        let mut maybe_sender = None;
475        if enable_dynamic_configuration {
476            let (sender, receiver) = tokio::sync::mpsc::channel(1);
477            loader = loader.with_dynamic_configuration(receiver);
478            maybe_sender = Some(sender);
479        }
480
481        static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
482
483        let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
484
485        if let Some(pairs) = env_vars.as_ref() {
486            for (k, v) in pairs.iter() {
487                std::env::set_var(format!("TEST_{}", k), v);
488            }
489        }
490
491        // Add environment provider last so it has the highest precedence.
492        let loader = loader
493            .from_environment("TEST")
494            .expect("should not fail to add environment provider");
495
496        // Remove only the test-provided env vars after snapshotting.
497        if let Some(pairs) = env_vars.as_ref() {
498            for (k, _) in pairs.iter() {
499                std::env::remove_var(k);
500            }
501        }
502
503        drop(guard);
504
505        let cfg = loader
506            .into_generic()
507            .await
508            .expect("should not fail to build generic configuration");
509
510        (cfg, maybe_sender)
511    }
512}
513
514fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
515    sources.iter().fold(Figment::new(), |figment, source| match source {
516        ProviderSource::Static(p) => figment.admerge(p.clone()),
517        // No-op. The merging is handled by the updater task.
518        ProviderSource::Dynamic(_) => figment,
519    })
520}
521
522/// Inserts or updates a value for a key.
523///
524/// Intermediate objects are created if they don't exist.
525fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
526    if !root.is_object() {
527        *root = serde_json::Value::Object(serde_json::Map::new());
528    }
529
530    let mut current = root;
531    // Create a new node for each segment if the key is dotted.
532    let mut segments = key.split('.').peekable();
533
534    while let Some(seg) = segments.next() {
535        let is_leaf = segments.peek().is_none();
536
537        // Ensure current is an object before operating
538        if !current.is_object() {
539            *current = serde_json::Value::Object(serde_json::Map::new());
540        }
541        let node = current.as_object_mut().expect("current node should be an object");
542
543        if is_leaf {
544            node.insert(seg.to_string(), value);
545            break;
546        } else {
547            // Ensure child exists and is an object
548            let should_create_node = match node.get(seg) {
549                Some(v) => !v.is_object(),
550                None => true,
551            };
552            // Check if we need to create an intermediate node if it doesn't exist.
553            if should_create_node {
554                node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
555            }
556
557            // Advance the current node to the next level.
558            current = node.get_mut(seg).expect("should not fail to get nested object");
559        }
560    }
561}
562
563async fn run_dynamic_config_updater(
564    inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
565    sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
566) {
567    // The first message on the channel will be the initial snapshot.
568    let initial_update = match receiver.recv().await {
569        Some(update) => update,
570        None => {
571            // The channel was closed before we even received the initial snapshot.
572            debug!("Dynamic configuration channel closed before initial snapshot.");
573            return;
574        }
575    };
576
577    let mut dynamic_state = match initial_update {
578        ConfigUpdate::Snapshot(state) => state,
579        ConfigUpdate::Partial { .. } => {
580            // This is theoretically unreachable, as `configstream` should always send a snapshot first.
581            error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
582            serde_json::Value::Null
583        }
584    };
585
586    // Rebuild the configuration with the initial snapshot.
587    let new_figment = provider_sources
588        .iter()
589        .fold(Figment::new(), |figment, source| match source {
590            ProviderSource::Static(p) => figment.admerge(p.clone()),
591            ProviderSource::Dynamic(_) => {
592                figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
593            }
594        });
595
596    // Update the main figment object and then release the lock.
597    {
598        let mut figment_guard = inner.figment.write().unwrap();
599        *figment_guard = new_figment.clone();
600    }
601
602    // Signal that the initial snapshot has been processed and the configuration is ready.
603    if ready_sender.send(()).is_err() {
604        debug!("Configuration readiness receiver dropped. Updater task shutting down.");
605        return;
606    }
607
608    // Set our "current" state for the main loop.
609    let mut current_config: figment::value::Value = new_figment.extract().unwrap();
610
611    // Enter the main loop to process subsequent updates.
612    loop {
613        let update = match receiver.recv().await {
614            Some(update) => update,
615            None => {
616                // The sender was dropped, which means the config stream has terminated. We can exit.
617                debug!("Dynamic configuration update channel closed. Updater task shutting down.");
618                return;
619            }
620        };
621
622        // Update our local dynamic state based on the received message.
623        match update {
624            ConfigUpdate::Snapshot(new_state) => {
625                dynamic_state = new_state;
626            }
627            ConfigUpdate::Partial { key, value } => {
628                if dynamic_state.is_null() {
629                    dynamic_state = serde_json::Value::Object(serde_json::Map::new());
630                }
631                if dynamic_state.is_object() {
632                    upsert(&mut dynamic_state, &key, value);
633                } else {
634                    error!(
635                        "Received partial update but current dynamic state is not an object. This should not happen."
636                    );
637                }
638            }
639        }
640
641        // Rebuild the figment object on every update, respecting the original provider order.
642        let new_figment = provider_sources
643            .iter()
644            .fold(Figment::new(), |figment, source| match source {
645                ProviderSource::Static(p) => figment.admerge(p.clone()),
646                ProviderSource::Dynamic(_) => {
647                    figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
648                }
649            });
650
651        let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
652
653        if current_config != new_config {
654            for change in dynamic::diff_config(&current_config, &new_config) {
655                // Send the change event to any receivers of the dynamic handler.
656                // If there are no receivers, `send` will fail. This is expected and fine,
657                // so we can ignore the error to avoid log spam.
658                let _ = sender.send(change);
659            }
660
661            let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
662                error!("Failed to acquire write lock for dynamic configuration: {}", e);
663                e.into_inner()
664            });
665            *figment_guard = new_figment;
666
667            // Update our "current" state for the next iteration.
668            current_config = new_config;
669        }
670    }
671}
672
673#[derive(Debug)]
674struct Inner {
675    figment: RwLock<Figment>,
676    lookup_sources: HashSet<LookupSource>,
677    event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
678    ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
679}
680
681/// A generic configuration object.
682///
683/// This represents the merged configuration derived from [`ConfigurationLoader`] in its raw form.  Values can be
684/// queried by key, and can be extracted either as typed values or in their raw form.
685///
686/// Keys must be in the form of `a.b.c`, where periods (`.`) as used to indicate a nested value.
687///
688/// Using an example JSON configuration:
689///
690/// ```json
691/// {
692///   "a": {
693///     "b": {
694///       "c": "value"
695///     }
696///   }
697/// }
698/// ```
699///
700/// Querying for the value of `a.b.c` would return `"value"`, and querying for `a.b` would return the nested object `{
701/// "c": "value" }`.
702#[derive(Clone, Debug)]
703pub struct GenericConfiguration {
704    inner: Arc<Inner>,
705}
706
707impl GenericConfiguration {
708    /// Waits for the configuration to be ready, if dynamic configuration is enabled.
709    ///
710    /// If dynamic configuration is in use, this method will asynchronously wait until the first snapshot has been
711    /// received and applied.
712    ///
713    /// If dynamic configuration is not used, it returns immediately.
714    pub async fn ready(&self) {
715        // We need a lock to both ensure that multiple callers can race against this,
716        // and to allow us mutable access to consume the receiver.
717        let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
718        if let Some(ready_rx) = maybe_ready_rx.take() {
719            // We're the first caller to wait for readiness.
720            if ready_rx.await.is_err() {
721                error!("Failed to receive configuration readiness signal; updater task may have panicked.");
722            }
723        }
724    }
725
726    fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
727    where
728        T: Deserialize<'a>,
729    {
730        let figment_guard = self.inner.figment.read().unwrap();
731        match figment_guard.extract_inner(key) {
732            Ok(value) => Ok(value),
733            Err(e) => {
734                if matches!(e.kind, figment::error::Kind::MissingField(_)) {
735                    // We might have been given a key that uses nested notation -- `foo.bar` -- but is only present in the
736                    // environment variables. We specifically don't want to use a different separator in environment
737                    // variables to map to nested key separators, so we simply try again here but with all nested key
738                    // separators (`.`) replaced with `_`, to match environment variables.
739                    let fallback_key = key.replace('.', "_");
740                    figment_guard
741                        .extract_inner(&fallback_key)
742                        .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
743                } else {
744                    Err(e.into())
745                }
746            }
747        }
748    }
749
750    /// Gets a configuration value by key.
751    ///
752    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
753    ///
754    /// ## Errors
755    ///
756    /// If the key does not exist in the configuration, or if the value could not be deserialized into `T`, an error
757    /// variant will be returned.
758    pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
759    where
760        T: Deserialize<'a>,
761    {
762        self.get(key)
763    }
764
765    /// Gets a configuration value by key, or the default value if a key does not exist or could not be deserialized.
766    ///
767    /// The `Default` implementation of `T` will be used both if the key could not be found, as well as for any error
768    /// during deserialization. This effectively swallows any errors and should generally be used sparingly.
769    ///
770    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
771    pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
772    where
773        T: Default + Deserialize<'a>,
774    {
775        self.get(key).unwrap_or_default()
776    }
777
778    /// Gets a configuration value by key, if it exists.
779    ///
780    /// If the key exists in the configuration, and can be deserialized, `Ok(Some(value))` is returned. Otherwise,
781    /// `Ok(None)` will be returned.
782    ///
783    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
784    ///
785    /// ## Errors
786    ///
787    /// If the value could not be deserialized into `T`, an error will be returned.
788    pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
789    where
790        T: Deserialize<'a>,
791    {
792        match self.get(key) {
793            Ok(value) => Ok(Some(value)),
794            Err(ConfigurationError::MissingField { .. }) => Ok(None),
795            Err(e) => Err(e),
796        }
797    }
798
799    /// Attempts to deserialize the entire configuration as `T`.
800    ///
801    /// ## Errors
802    ///
803    /// If the value could not be deserialized into `T`, an error will be returned.
804    pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
805    where
806        T: Deserialize<'a>,
807    {
808        self.inner
809            .figment
810            .read()
811            .unwrap()
812            .extract()
813            .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
814    }
815
816    /// Subscribes for updates to the configuration.
817    pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
818        self.inner.event_sender.as_ref().map(|s| s.subscribe())
819    }
820
821    /// Creates a watcher that yields only when the given key changes.
822    ///
823    /// If dynamic configuration is disabled, the returned watcher's `changed()`
824    /// will wait indefinitely.
825    pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
826        FieldUpdateWatcher {
827            key: key.to_string(),
828            rx: self.subscribe_for_updates(),
829        }
830    }
831}
832
833fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
834    match e.kind {
835        Kind::MissingField(field) => {
836            let mut valid_keys = lookup_sources
837                .iter()
838                .map(|source| source.transform_key(&field))
839                .collect::<Vec<_>>();
840
841            // Always specify the original key as a valid key to try.
842            valid_keys.insert(0, field.to_string());
843
844            let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
845
846            ConfigurationError::MissingField { help_text, field }
847        }
848        Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
849            field: e.path.join("."),
850            expected_ty,
851            actual_ty: actual_ty.to_string(),
852        },
853        _ => ConfigurationError::Generic { source: e.into() },
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use super::*;
860
861    #[tokio::test]
862    async fn test_static_configuration() {
863        let (cfg, _) = ConfigurationLoader::for_tests(
864            Some(serde_json::json!({
865                "foo": "bar",
866                "baz": 5,
867                "foobar": { "a": false, "b": "c" }
868            })),
869            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
870            false,
871        )
872        .await;
873        cfg.ready().await;
874
875        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
876        assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
877        assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
878        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
879        assert!(matches!(
880            cfg.get::<String>("nonexistentKey"),
881            Err(ConfigurationError::MissingField { .. })
882        ));
883    }
884
885    #[tokio::test]
886    async fn test_dynamic_configuration() {
887        let (cfg, sender) = ConfigurationLoader::for_tests(
888            Some(serde_json::json!({
889                "foo": "bar",
890                "baz": 5,
891                "foobar": { "a": false, "b": "c" }
892            })),
893            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
894            true,
895        )
896        .await;
897        let sender = sender.expect("sender should exist");
898        sender
899            .send(ConfigUpdate::Snapshot(serde_json::json!({
900                "new": "from_snapshot",
901            })))
902            .await
903            .unwrap();
904
905        cfg.ready().await;
906
907        // Test that existing values still exist.
908        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
909
910        // Test that new values from the snapshot exist.
911        assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
912
913        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
914
915        sender
916            .send(ConfigUpdate::Partial {
917                key: "new_key".to_string(),
918                value: "from dynamic update".to_string().into(),
919            })
920            .await
921            .unwrap();
922
923        tokio::time::timeout(std::time::Duration::from_secs(2), async {
924            loop {
925                match rx.recv().await {
926                    Ok(ev) if ev.key == "new_key" => break ev,
927                    Err(e) => panic!("updates channel closed: {e}"),
928                    Ok(_) => continue,
929                }
930            }
931        })
932        .await
933        .expect("timed out waiting for new_key update");
934
935        assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
936
937        // Test that an update with a nested key is applied.
938        sender
939            .send(ConfigUpdate::Partial {
940                key: "foobar.a".to_string(),
941                value: serde_json::json!(true),
942            })
943            .await
944            .unwrap();
945
946        tokio::time::timeout(std::time::Duration::from_secs(2), async {
947            loop {
948                match rx.recv().await {
949                    Ok(ev) if ev.key == "foobar.a" => break ev,
950                    Err(e) => panic!("updates channel closed: {e}"),
951                    Ok(_) => continue,
952                }
953            }
954        })
955        .await
956        .expect("timed out waiting for foobar.a update");
957
958        assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
959        assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
960    }
961
962    #[tokio::test]
963    async fn test_environment_precedence_over_dynamic() {
964        let (cfg, sender) = ConfigurationLoader::for_tests(
965            Some(serde_json::json!({
966                "foo": "bar",
967                "baz": 5,
968                "foobar": { "a": false, "b": "c" }
969            })),
970            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
971            true,
972        )
973        .await;
974        let sender = sender.expect("sender should exist");
975
976        sender
977            .send(ConfigUpdate::Snapshot(serde_json::json!({
978                "env_var": "from_snapshot_env_var"
979            })))
980            .await
981            .unwrap();
982
983        cfg.ready().await;
984
985        // Env provider has highest precedence so the snapshot should not override it.
986        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
987
988        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
989
990        // Send a partial update that attempts to override the env-backed key.
991        sender
992            .send(ConfigUpdate::Partial {
993                key: "env_var".to_string(),
994                value: serde_json::json!("from_partial"),
995            })
996            .await
997            .unwrap();
998
999        // Also attempt to override the nested env-backed key via dynamic.
1000        sender
1001            .send(ConfigUpdate::Partial {
1002                key: "foobar.a".to_string(),
1003                value: serde_json::json!(false),
1004            })
1005            .await
1006            .unwrap();
1007
1008        // Send a dummy partial update to ensure the updater has processed prior partials.
1009        sender
1010            .send(ConfigUpdate::Partial {
1011                key: "dummy".to_string(),
1012                value: serde_json::json!(1),
1013            })
1014            .await
1015            .unwrap();
1016
1017        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1018            loop {
1019                match rx.recv().await {
1020                    Ok(ev) if ev.key == "dummy" => break,
1021                    Err(e) => panic!("updates channel closed: {e}"),
1022                    Ok(_) => continue,
1023                }
1024            }
1025        })
1026        .await
1027        .expect("timed out waiting for sync marker");
1028
1029        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1030    }
1031
1032    #[tokio::test]
1033    async fn test_dynamic_configuration_add_new_nested_key() {
1034        let (cfg, sender) = ConfigurationLoader::for_tests(
1035            Some(serde_json::json!({
1036                "foo": "bar",
1037                "baz": 5,
1038                "foobar": { "a": false, "b": "c" }
1039            })),
1040            None,
1041            true,
1042        )
1043        .await;
1044        let sender = sender.expect("sender should exist");
1045
1046        sender
1047            .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1048            .await
1049            .unwrap();
1050        cfg.ready().await;
1051
1052        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1053
1054        sender
1055            .send(ConfigUpdate::Partial {
1056                key: "new_parent.new_child".to_string(),
1057                value: serde_json::json!(42),
1058            })
1059            .await
1060            .unwrap();
1061
1062        // new_parent object did not exist before, so the diff will emit the object "new_parent"
1063        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1064            loop {
1065                match rx.recv().await {
1066                    Ok(ev) if ev.key == "new_parent" => break ev,
1067                    Err(e) => panic!("updates channel closed: {e}"),
1068                    Ok(_) => continue,
1069                }
1070            }
1071        })
1072        .await
1073        .expect("timed out waiting for new_parent.new_child update");
1074
1075        assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1076    }
1077
1078    #[tokio::test]
1079    async fn test_underscore_fallback_on_get() {
1080        let (cfg, _) = ConfigurationLoader::for_tests(
1081            Some(serde_json::json!({})),
1082            Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1083            false,
1084        )
1085        .await;
1086        cfg.ready().await;
1087
1088        assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1089    }
1090
1091    #[tokio::test]
1092    async fn test_static_configuration_ready_and_subscribe() {
1093        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1094        assert!(maybe_sender.is_none());
1095
1096        tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1097            .await
1098            .expect("ready() should not block when dynamic is disabled");
1099
1100        assert!(cfg.subscribe_for_updates().is_none());
1101    }
1102
1103    #[tokio::test]
1104    async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1105        // Enable dynamic but do not send the initial snapshot.
1106        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1107        assert!(maybe_sender.is_some());
1108
1109        // ready() should not resolve until the initial snapshot is processed.
1110        let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1111        assert!(res.is_err(), "ready() should time out without an initial snapshot");
1112    }
1113}