saluki_config/
lib.rs

1//! Primitives for working with typed and untyped configuration data.
2#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10    error::Kind,
11    providers::{Env, Serialized},
12    Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::{ResultExt as _, Snafu};
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod dynamic;
21mod provider;
22mod secrets;
23
24pub use self::dynamic::FieldUpdateWatcher;
25use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
26use self::provider::ResolvedProvider;
27
28#[derive(Clone)]
29struct ArcProvider(Arc<dyn Provider + Send + Sync>);
30
31impl Provider for ArcProvider {
32    fn metadata(&self) -> figment::Metadata {
33        self.0.metadata()
34    }
35
36    fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
37        self.0.data()
38    }
39}
40
41enum ProviderSource {
42    Static(ArcProvider),
43    Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
44}
45
46impl Clone for ProviderSource {
47    fn clone(&self) -> Self {
48        match self {
49            Self::Static(p) => Self::Static(p.clone()),
50            Self::Dynamic(_) => Self::Dynamic(None),
51        }
52    }
53}
54
55/// A configuration error.
56#[derive(Debug, Snafu)]
57#[snafu(context(suffix(false)))]
58pub enum ConfigurationError {
59    /// Environment variable prefix was empty.
60    #[snafu(display("Environment variable prefix must not be empty."))]
61    EmptyPrefix,
62
63    /// Requested field was missing from the configuration.
64    #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
65    MissingField {
66        /// Help text describing how to set the missing field.
67        ///
68        /// This is meant to be displayed to the user, and includes environment variable-specific text if environment
69        /// variables had been loaded originally.
70        help_text: String,
71
72        /// Name of the missing field.
73        field: Cow<'static, str>,
74    },
75
76    /// Requested field's data type was not the unexpected data type.
77    #[snafu(display(
78        "Expected value for field '{}' to be '{}', got '{}' instead.",
79        field,
80        expected_ty,
81        actual_ty
82    ))]
83    InvalidFieldType {
84        /// Name of the invalid field.
85        ///
86        /// This is a period-separated path to the field.
87        field: String,
88
89        /// Expected data type.
90        expected_ty: String,
91
92        /// Actual data type.
93        actual_ty: String,
94    },
95
96    /// Generic configuration error.
97    #[snafu(transparent)]
98    Generic {
99        /// Error source.
100        source: GenericError,
101    },
102
103    /// Secrets resolution error.
104    #[snafu(display("Failed to resolve secrets."))]
105    Secrets {
106        /// Error source.
107        source: secrets::Error,
108    },
109}
110
111impl From<figment::Error> for ConfigurationError {
112    fn from(e: figment::Error) -> Self {
113        match e.kind {
114            Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
115                field: e.path.join("."),
116                expected_ty,
117                actual_ty: actual_ty.to_string(),
118            },
119            _ => Self::Generic { source: e.into() },
120        }
121    }
122}
123
124#[derive(Clone, Debug, Eq, Hash, PartialEq)]
125enum LookupSource {
126    /// The configuration key is looked up in a form suitable for environment variables.
127    Environment { prefix: String },
128}
129
130impl LookupSource {
131    fn transform_key(&self, key: &str) -> String {
132        match self {
133            // The prefix should already be uppercased, with a trailing underscore, which is needed when we actually
134            // configure the provider used for reading from the environment... so we don't need to re-do that here.
135            LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
136        }
137    }
138}
139
140/// A configuration loader that can pull from various sources.
141///
142/// This loader provides a wrapper around a lower-level library, `figment`, to expose a simpler and focused API for both
143/// loading configuration data from various sources, as well as querying it.
144///
145/// A variety of configuration sources can be configured (see below), with an implicit priority based on the order in
146/// which sources are added: sources added later take precedence over sources prior. Additionally, either a typed value
147/// can be extracted from the configuration ([`into_typed`][Self::into_typed]), or the raw configuration data can be
148/// accessed via a generic API ([`into_generic`][Self::into_generic]).
149///
150/// # Supported sources
151///
152/// - YAML file
153/// - JSON file
154/// - environment variables (must be prefixed; see [`from_environment`][Self::from_environment])
155#[derive(Clone, Default)]
156pub struct ConfigurationLoader {
157    lookup_sources: HashSet<LookupSource>,
158    provider_sources: Vec<ProviderSource>,
159}
160
161impl ConfigurationLoader {
162    /// Loads the given YAML configuration file.
163    ///
164    /// # Errors
165    ///
166    /// If the file could not be read, or if the file is not valid YAML, an error will be returned.
167    pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
168    where
169        P: AsRef<std::path::Path>,
170    {
171        let resolved_provider = ResolvedProvider::from_yaml(&path)?;
172        self.provider_sources
173            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
174        Ok(self)
175    }
176
177    /// Attempts to load the given YAML configuration file, ignoring any errors.
178    ///
179    /// Errors include the file not existing, not being readable/accessible, and not being valid YAML.
180    pub fn try_from_yaml<P>(mut self, path: P) -> Self
181    where
182        P: AsRef<std::path::Path>,
183    {
184        match ResolvedProvider::from_yaml(&path) {
185            Ok(resolved_provider) => {
186                self.provider_sources
187                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
188            }
189            Err(e) => {
190                println!(
191                    "Unable to read YAML configuration file '{}': {}. Ignoring.",
192                    path.as_ref().to_string_lossy(),
193                    e
194                );
195            }
196        }
197        self
198    }
199
200    /// Loads the given JSON configuration file.
201    ///
202    /// # Errors
203    ///
204    /// If the file could not be read, or if the file is not valid JSON, an error will be returned.
205    pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
206    where
207        P: AsRef<std::path::Path>,
208    {
209        let resolved_provider = ResolvedProvider::from_json(&path)?;
210        self.provider_sources
211            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
212        Ok(self)
213    }
214
215    /// Attempts to load the given JSON configuration file, ignoring any errors.
216    ///
217    /// Errors include the file not existing, not being readable/accessible, and not being valid JSON.
218    pub fn try_from_json<P>(mut self, path: P) -> Self
219    where
220        P: AsRef<std::path::Path>,
221    {
222        match ResolvedProvider::from_json(&path) {
223            Ok(resolved_provider) => {
224                self.provider_sources
225                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
226            }
227            Err(e) => {
228                println!(
229                    "Unable to read JSON configuration file '{}': {}. Ignoring.",
230                    path.as_ref().to_string_lossy(),
231                    e
232                );
233            }
234        }
235        self
236    }
237
238    /// Loads configuration from environment variables.
239    ///
240    /// The prefix given will have an underscore appended to it if it does not already end with one. For example, with a
241    /// prefix of `app`, any environment variable starting with `app_` would be matched.
242    ///
243    /// The prefix is case-insensitive.
244    ///
245    /// # Errors
246    ///
247    /// If the prefix is empty, an error will be returned.
248    pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
249        if prefix.is_empty() {
250            return Err(ConfigurationError::EmptyPrefix);
251        }
252
253        let prefix = if prefix.ends_with('_') {
254            prefix.to_string()
255        } else {
256            format!("{}_", prefix)
257        };
258
259        // Convert to use Serialized::defaults since, Env isn't Send + Sync
260        let env = Env::prefixed(&prefix).split("__");
261        let values = env.data().unwrap();
262        if let Some(default_dict) = values.get(&figment::Profile::Default) {
263            self.provider_sources
264                .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
265                    default_dict.clone(),
266                )))));
267            self.lookup_sources.insert(LookupSource::Environment { prefix });
268        }
269        Ok(self)
270    }
271
272    /// Resolves secrets in the configuration based on available secret backend configuration.
273    ///
274    /// This will attempt to resolve any secret references (format shown below) in the configuration by using a "secrets
275    /// backend", which is a user-provided command that utilizes a simple JSON-based protocol to accept secrets to
276    /// resolve, and return those resolved secrets, or the errors that occurred during resolving.
277    ///
278    /// # Configuration
279    ///
280    /// This method uses the existing configuration (see Caveats) to determine the secrets backend configuration. The
281    /// following configuration settings are used:
282    ///
283    /// - `secret_backend_command`: The executable to resolve secrets. (required)
284    /// - `secret_backend_timeout`: The timeout for the secrets backend command, in seconds. (optional, default: 30)
285    ///
286    /// # Usage
287    ///
288    /// For any value which should be resolved as a secret, the value should be a string in the format of
289    /// `ENC[secret_reference]`. The `secret_reference` portion is the value that will be sent to the backend command
290    /// during resolution. There is no limitation on the format of the `secret_reference` value, so long as it can be
291    /// expressed through the existing configuration sources (YAML, environment variables, etc).
292    ///
293    /// The entire configuration value must match this pattern, and cannot be used to replace only part of a value, so
294    /// values such as `db-ENC[secret_reference]` would not be detected as secrets and thus would not be resolved.
295    ///
296    /// # Protocol
297    ///
298    /// The executable is expected to accept a JSON object on stdin, with the following format:
299    ///
300    /// ```json
301    /// {
302    ///   "version": "1.0",
303    ///   "secrets": ["key1", "key2"]
304    /// }
305    /// ```
306    ///
307    /// The executable is expected return a JSON object on stdout, with the following format:
308    /// ```json
309    /// {
310    ///   "key1": {
311    ///     "value": "my_secret_password",
312    ///     "error": null
313    ///   },
314    ///   "key2": {
315    ///     "value": null,
316    ///     "error": "could not fetch the secret"
317    ///   }
318    /// }
319    /// ```
320    ///
321    /// If any entry in the response has an `error` value that is anything but `null`, the overall resolution will be
322    /// considered failed.
323    ///
324    /// # Caveats
325    ///
326    /// ## Time of resolution
327    ///
328    /// Secrets resolution happens at the time this method is called, and only resolves configuration values that are
329    /// already present in the configuration, which means all calls to load configuration (`try_from_yaml`,
330    /// `from_environment`, etc) must be made before calling this method.
331    ///
332    /// ## Sensitive data in error output
333    ///
334    /// Care should be taken to not return sensitive information in either the error output (standard error) of the
335    /// backend command or the `error` field in the JSON response, as these values are logged in order to aid debugging.
336    pub async fn with_default_secrets_resolution(mut self) -> Result<Self, ConfigurationError> {
337        let initial_figment = build_figment_from_sources(&self.provider_sources);
338
339        // If no secrets backend is set, we can't resolve secrets, so just return early.
340        if !initial_figment.contains("secret_backend_command") {
341            debug!("No secrets backend configured; skipping secrets resolution.");
342            return Ok(self);
343        }
344
345        let resolver_config = initial_figment.extract::<secrets::resolver::ExternalProcessResolverConfiguration>()?;
346        let resolver = secrets::resolver::ExternalProcessResolver::from_configuration(resolver_config)
347            .await
348            .context(Secrets)?;
349
350        let provider = secrets::Provider::new(resolver, &initial_figment)
351            .await
352            .context(Secrets)?;
353
354        self.provider_sources
355            .push(ProviderSource::Static(ArcProvider(Arc::new(provider))));
356        Ok(self)
357    }
358
359    /// Enables dynamic configuration.
360    ///
361    /// The receiver is used in `run_dynamic_config_updater` to handle retrieving the initial snapshot and subsequent updates.
362    pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
363        self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
364        self
365    }
366
367    /// Consumes the configuration loader, deserializing it as `T`.
368    ///
369    /// ## Errors
370    ///
371    /// If the configuration could not be deserialized into `T`, an error will be returned.
372    pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
373    where
374        T: Deserialize<'a>,
375    {
376        let figment = build_figment_from_sources(&self.provider_sources);
377        figment.extract().map_err(Into::into)
378    }
379
380    /// Creates a bootstrap `GenericConfiguration` without consuming the loader.
381    ///
382    /// This creates a static snapshot of the configuration loaded so far. As this is intended for bootstrapping
383    /// before dynamic configuration is active, the dynamic provider is ignored.
384    pub fn bootstrap_generic(&self) -> GenericConfiguration {
385        let figment = build_figment_from_sources(&self.provider_sources);
386
387        GenericConfiguration {
388            inner: Arc::new(Inner {
389                figment: RwLock::new(figment),
390                lookup_sources: self.lookup_sources.clone(),
391                event_sender: None,
392                ready_signal: Mutex::new(None),
393            }),
394        }
395    }
396
397    /// Consumes the configuration loader and wraps it in a generic wrapper.
398    pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
399        let has_dynamic_provider = self
400            .provider_sources
401            .iter()
402            .any(|s| matches!(s, ProviderSource::Dynamic(_)));
403
404        if has_dynamic_provider {
405            let mut receiver_opt = None;
406            for source in self.provider_sources.iter_mut() {
407                if let ProviderSource::Dynamic(ref mut receiver) = source {
408                    receiver_opt = receiver.take();
409                    break;
410                }
411            }
412            let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
413
414            // Build the initial figment object from the static providers. The dynamic provider is empty for now.
415            let figment = build_figment_from_sources(&self.provider_sources);
416
417            let (event_sender, _) = broadcast::channel(100);
418            let (ready_sender, ready_receiver) = oneshot::channel();
419
420            let generic_config = GenericConfiguration {
421                inner: Arc::new(Inner {
422                    figment: RwLock::new(figment),
423                    lookup_sources: self.lookup_sources,
424                    event_sender: Some(event_sender.clone()),
425                    ready_signal: Mutex::new(Some(ready_receiver)),
426                }),
427            };
428
429            // Spawn the background task to handle retrieving the initial snapshot and subsequent updates.
430            tokio::spawn(run_dynamic_config_updater(
431                generic_config.inner.clone(),
432                receiver,
433                self.provider_sources,
434                event_sender,
435                ready_sender,
436            ));
437
438            Ok(generic_config)
439        } else {
440            // Otherwise, just build the static configuration.
441            let figment = build_figment_from_sources(&self.provider_sources);
442
443            Ok(GenericConfiguration {
444                inner: Arc::new(Inner {
445                    figment: RwLock::new(figment),
446                    lookup_sources: self.lookup_sources,
447                    event_sender: None,
448                    ready_signal: Mutex::new(None),
449                }),
450            })
451        }
452    }
453
454    /// Configures a [`GenericConfiguration`] that is suitable for tests.
455    ///
456    /// This configures the loader with the following defaults:
457    ///
458    /// - configuration from a JSON file
459    /// - configuration from environment variables
460    ///
461    /// If `enable_dynamic_configuration` is true, a dynamic configuration sender is returned.
462    ///
463    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate testing scenarios.
464    pub async fn for_tests(
465        file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
466        enable_dynamic_configuration: bool,
467    ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
468        let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
469        let path = &json_file.path();
470        let json_to_write = file_values.unwrap_or(serde_json::json!({}));
471        serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
472
473        let mut loader = ConfigurationLoader::default().try_from_json(path);
474        let mut maybe_sender = None;
475        if enable_dynamic_configuration {
476            let (sender, receiver) = tokio::sync::mpsc::channel(1);
477            loader = loader.with_dynamic_configuration(receiver);
478            maybe_sender = Some(sender);
479        }
480
481        static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
482
483        let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
484
485        if let Some(pairs) = env_vars.as_ref() {
486            for (k, v) in pairs.iter() {
487                std::env::set_var(format!("TEST_{}", k), v);
488            }
489        }
490
491        // Add environment provider last so it has the highest precedence.
492        let loader = loader
493            .from_environment("TEST")
494            .expect("should not fail to add environment provider");
495
496        // Remove only the test-provided env vars after snapshotting.
497        if let Some(pairs) = env_vars.as_ref() {
498            for (k, _) in pairs.iter() {
499                std::env::remove_var(k);
500            }
501        }
502
503        drop(guard);
504
505        let cfg = loader
506            .into_generic()
507            .await
508            .expect("should not fail to build generic configuration");
509
510        (cfg, maybe_sender)
511    }
512}
513
514fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
515    sources.iter().fold(Figment::new(), |figment, source| match source {
516        ProviderSource::Static(p) => figment.admerge(p.clone()),
517        // No-op. The merging is handled by the updater task.
518        ProviderSource::Dynamic(_) => figment,
519    })
520}
521
522/// Inserts or updates a value for a key.
523///
524/// Intermediate objects are created if they don't exist.
525fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
526    if !root.is_object() {
527        *root = serde_json::Value::Object(serde_json::Map::new());
528    }
529
530    let mut current = root;
531    // Create a new node for each segment if the key is dotted.
532    let mut segments = key.split('.').peekable();
533
534    while let Some(seg) = segments.next() {
535        let is_leaf = segments.peek().is_none();
536
537        // Ensure current is an object before operating
538        if !current.is_object() {
539            *current = serde_json::Value::Object(serde_json::Map::new());
540        }
541        let node = current.as_object_mut().expect("current node should be an object");
542
543        if is_leaf {
544            node.insert(seg.to_string(), value);
545            break;
546        } else {
547            // Ensure child exists and is an object
548            let should_create_node = match node.get(seg) {
549                Some(v) => !v.is_object(),
550                None => true,
551            };
552            // Check if we need to create an intermediate node if it doesn't exist.
553            if should_create_node {
554                node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
555            }
556
557            // Advance the current node to the next level.
558            current = node.get_mut(seg).expect("should not fail to get nested object");
559        }
560    }
561}
562
563async fn run_dynamic_config_updater(
564    inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
565    sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
566) {
567    // The first message on the channel will be the initial snapshot.
568    let initial_update = match receiver.recv().await {
569        Some(update) => update,
570        None => {
571            // The channel was closed before we even received the initial snapshot.
572            debug!("Dynamic configuration channel closed before initial snapshot.");
573            return;
574        }
575    };
576
577    let mut dynamic_state = match initial_update {
578        ConfigUpdate::Snapshot(state) => state,
579        ConfigUpdate::Partial { .. } => {
580            // This is theoretically unreachable, as `configstream` should always send a snapshot first.
581            error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
582            serde_json::Value::Null
583        }
584    };
585
586    // Rebuild the configuration with the initial snapshot.
587    let new_figment = provider_sources
588        .iter()
589        .fold(Figment::new(), |figment, source| match source {
590            ProviderSource::Static(p) => figment.admerge(p.clone()),
591            ProviderSource::Dynamic(_) => {
592                figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
593            }
594        });
595
596    // Update the main figment object and then release the lock.
597    {
598        let mut figment_guard = inner.figment.write().unwrap();
599        *figment_guard = new_figment.clone();
600    }
601
602    // Signal that the initial snapshot has been processed and the configuration is ready.
603    if ready_sender.send(()).is_err() {
604        debug!("Configuration readiness receiver dropped. Updater task shutting down.");
605        return;
606    }
607
608    // Set our "current" state for the main loop.
609    let mut current_config: figment::value::Value = new_figment.extract().unwrap();
610
611    // Enter the main loop to process subsequent updates.
612    loop {
613        let update = match receiver.recv().await {
614            Some(update) => update,
615            None => {
616                // The sender was dropped, which means the config stream has terminated. We can exit.
617                debug!("Dynamic configuration update channel closed. Updater task shutting down.");
618                return;
619            }
620        };
621
622        // Update our local dynamic state based on the received message.
623        match update {
624            ConfigUpdate::Snapshot(new_state) => {
625                debug!("Received configuration snapshot update.");
626                dynamic_state = new_state;
627            }
628            ConfigUpdate::Partial { key, value } => {
629                debug!(%key, "Received partial configuration update.");
630                if dynamic_state.is_null() {
631                    dynamic_state = serde_json::Value::Object(serde_json::Map::new());
632                }
633                if dynamic_state.is_object() {
634                    upsert(&mut dynamic_state, &key, value);
635                } else {
636                    error!(
637                        "Received partial update but current dynamic state is not an object. This should not happen."
638                    );
639                }
640            }
641        }
642
643        // Rebuild the figment object on every update, respecting the original provider order.
644        let new_figment = provider_sources
645            .iter()
646            .fold(Figment::new(), |figment, source| match source {
647                ProviderSource::Static(p) => figment.admerge(p.clone()),
648                ProviderSource::Dynamic(_) => {
649                    figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
650                }
651            });
652
653        let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
654
655        if current_config != new_config {
656            for change in dynamic::diff_config(&current_config, &new_config) {
657                // Send the change event to any receivers of the dynamic handler.
658                // If there are no receivers, `send` will fail. This is expected and fine,
659                // so we can ignore the error to avoid log spam.
660                let _ = sender.send(change);
661            }
662
663            let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
664                error!("Failed to acquire write lock for dynamic configuration: {}", e);
665                e.into_inner()
666            });
667            *figment_guard = new_figment;
668
669            // Update our "current" state for the next iteration.
670            current_config = new_config;
671        }
672    }
673}
674
675#[derive(Debug)]
676struct Inner {
677    figment: RwLock<Figment>,
678    lookup_sources: HashSet<LookupSource>,
679    event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
680    ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
681}
682
683/// A generic configuration object.
684///
685/// This represents the merged configuration derived from [`ConfigurationLoader`] in its raw form.  Values can be
686/// queried by key, and can be extracted either as typed values or in their raw form.
687///
688/// Keys must be in the form of `a.b.c`, where periods (`.`) as used to indicate a nested value.
689///
690/// Using an example JSON configuration:
691///
692/// ```json
693/// {
694///   "a": {
695///     "b": {
696///       "c": "value"
697///     }
698///   }
699/// }
700/// ```
701///
702/// Querying for the value of `a.b.c` would return `"value"`, and querying for `a.b` would return the nested object `{
703/// "c": "value" }`.
704#[derive(Clone, Debug)]
705pub struct GenericConfiguration {
706    inner: Arc<Inner>,
707}
708
709impl GenericConfiguration {
710    /// Waits for the configuration to be ready, if dynamic configuration is enabled.
711    ///
712    /// If dynamic configuration is in use, this method will asynchronously wait until the first snapshot has been
713    /// received and applied.
714    ///
715    /// If dynamic configuration is not used, it returns immediately.
716    pub async fn ready(&self) {
717        // We need a lock to both ensure that multiple callers can race against this,
718        // and to allow us mutable access to consume the receiver.
719        let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
720        if let Some(ready_rx) = maybe_ready_rx.take() {
721            // We're the first caller to wait for readiness.
722            if ready_rx.await.is_err() {
723                error!("Failed to receive configuration readiness signal; updater task may have panicked.");
724            }
725        }
726    }
727
728    fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
729    where
730        T: Deserialize<'a>,
731    {
732        let figment_guard = self.inner.figment.read().unwrap();
733        match figment_guard.extract_inner(key) {
734            Ok(value) => Ok(value),
735            Err(e) => {
736                if matches!(e.kind, figment::error::Kind::MissingField(_)) {
737                    // We might have been given a key that uses nested notation -- `foo.bar` -- but is only present in the
738                    // environment variables. We specifically don't want to use a different separator in environment
739                    // variables to map to nested key separators, so we simply try again here but with all nested key
740                    // separators (`.`) replaced with `_`, to match environment variables.
741                    let fallback_key = key.replace('.', "_");
742                    figment_guard
743                        .extract_inner(&fallback_key)
744                        .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
745                } else {
746                    Err(e.into())
747                }
748            }
749        }
750    }
751
752    /// Gets a configuration value by key.
753    ///
754    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
755    ///
756    /// ## Errors
757    ///
758    /// If the key does not exist in the configuration, or if the value could not be deserialized into `T`, an error
759    /// variant will be returned.
760    pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
761    where
762        T: Deserialize<'a>,
763    {
764        self.get(key)
765    }
766
767    /// Gets a configuration value by key, or the default value if a key does not exist or could not be deserialized.
768    ///
769    /// The `Default` implementation of `T` will be used both if the key could not be found, as well as for any error
770    /// during deserialization. This effectively swallows any errors and should generally be used sparingly.
771    ///
772    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
773    pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
774    where
775        T: Default + Deserialize<'a>,
776    {
777        self.get(key).unwrap_or_default()
778    }
779
780    /// Gets a configuration value by key, if it exists.
781    ///
782    /// If the key exists in the configuration, and can be deserialized, `Ok(Some(value))` is returned. Otherwise,
783    /// `Ok(None)` will be returned.
784    ///
785    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
786    ///
787    /// ## Errors
788    ///
789    /// If the value could not be deserialized into `T`, an error will be returned.
790    pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
791    where
792        T: Deserialize<'a>,
793    {
794        match self.get(key) {
795            Ok(value) => Ok(Some(value)),
796            Err(ConfigurationError::MissingField { .. }) => Ok(None),
797            Err(e) => Err(e),
798        }
799    }
800
801    /// Attempts to deserialize the entire configuration as `T`.
802    ///
803    /// ## Errors
804    ///
805    /// If the value could not be deserialized into `T`, an error will be returned.
806    pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
807    where
808        T: Deserialize<'a>,
809    {
810        self.inner
811            .figment
812            .read()
813            .unwrap()
814            .extract()
815            .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
816    }
817
818    /// Subscribes for updates to the configuration.
819    pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
820        self.inner.event_sender.as_ref().map(|s| s.subscribe())
821    }
822
823    /// Creates a watcher that yields only when the given key changes.
824    ///
825    /// If dynamic configuration is disabled, the returned watcher's `changed()`
826    /// will wait indefinitely.
827    pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
828        FieldUpdateWatcher {
829            key: key.to_string(),
830            rx: self.subscribe_for_updates(),
831        }
832    }
833}
834
835fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
836    match e.kind {
837        Kind::MissingField(field) => {
838            let mut valid_keys = lookup_sources
839                .iter()
840                .map(|source| source.transform_key(&field))
841                .collect::<Vec<_>>();
842
843            // Always specify the original key as a valid key to try.
844            valid_keys.insert(0, field.to_string());
845
846            let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
847
848            ConfigurationError::MissingField { help_text, field }
849        }
850        Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
851            field: e.path.join("."),
852            expected_ty,
853            actual_ty: actual_ty.to_string(),
854        },
855        _ => ConfigurationError::Generic { source: e.into() },
856    }
857}
858
859#[cfg(test)]
860mod tests {
861    use super::*;
862
863    #[tokio::test]
864    async fn test_static_configuration() {
865        let (cfg, _) = ConfigurationLoader::for_tests(
866            Some(serde_json::json!({
867                "foo": "bar",
868                "baz": 5,
869                "foobar": { "a": false, "b": "c" }
870            })),
871            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
872            false,
873        )
874        .await;
875        cfg.ready().await;
876
877        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
878        assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
879        assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
880        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
881        assert!(matches!(
882            cfg.get::<String>("nonexistentKey"),
883            Err(ConfigurationError::MissingField { .. })
884        ));
885    }
886
887    #[tokio::test]
888    async fn test_dynamic_configuration() {
889        let (cfg, sender) = ConfigurationLoader::for_tests(
890            Some(serde_json::json!({
891                "foo": "bar",
892                "baz": 5,
893                "foobar": { "a": false, "b": "c" }
894            })),
895            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
896            true,
897        )
898        .await;
899        let sender = sender.expect("sender should exist");
900        sender
901            .send(ConfigUpdate::Snapshot(serde_json::json!({
902                "new": "from_snapshot",
903            })))
904            .await
905            .unwrap();
906
907        cfg.ready().await;
908
909        // Test that existing values still exist.
910        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
911
912        // Test that new values from the snapshot exist.
913        assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
914
915        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
916
917        sender
918            .send(ConfigUpdate::Partial {
919                key: "new_key".to_string(),
920                value: "from dynamic update".to_string().into(),
921            })
922            .await
923            .unwrap();
924
925        tokio::time::timeout(std::time::Duration::from_secs(2), async {
926            loop {
927                match rx.recv().await {
928                    Ok(ev) if ev.key == "new_key" => break ev,
929                    Err(e) => panic!("updates channel closed: {e}"),
930                    Ok(_) => continue,
931                }
932            }
933        })
934        .await
935        .expect("timed out waiting for new_key update");
936
937        assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
938
939        // Test that an update with a nested key is applied.
940        sender
941            .send(ConfigUpdate::Partial {
942                key: "foobar.a".to_string(),
943                value: serde_json::json!(true),
944            })
945            .await
946            .unwrap();
947
948        tokio::time::timeout(std::time::Duration::from_secs(2), async {
949            loop {
950                match rx.recv().await {
951                    Ok(ev) if ev.key == "foobar.a" => break ev,
952                    Err(e) => panic!("updates channel closed: {e}"),
953                    Ok(_) => continue,
954                }
955            }
956        })
957        .await
958        .expect("timed out waiting for foobar.a update");
959
960        assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
961        assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
962    }
963
964    #[tokio::test]
965    async fn test_environment_precedence_over_dynamic() {
966        let (cfg, sender) = ConfigurationLoader::for_tests(
967            Some(serde_json::json!({
968                "foo": "bar",
969                "baz": 5,
970                "foobar": { "a": false, "b": "c" }
971            })),
972            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
973            true,
974        )
975        .await;
976        let sender = sender.expect("sender should exist");
977
978        sender
979            .send(ConfigUpdate::Snapshot(serde_json::json!({
980                "env_var": "from_snapshot_env_var"
981            })))
982            .await
983            .unwrap();
984
985        cfg.ready().await;
986
987        // Env provider has highest precedence so the snapshot should not override it.
988        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
989
990        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
991
992        // Send a partial update that attempts to override the env-backed key.
993        sender
994            .send(ConfigUpdate::Partial {
995                key: "env_var".to_string(),
996                value: serde_json::json!("from_partial"),
997            })
998            .await
999            .unwrap();
1000
1001        // Also attempt to override the nested env-backed key via dynamic.
1002        sender
1003            .send(ConfigUpdate::Partial {
1004                key: "foobar.a".to_string(),
1005                value: serde_json::json!(false),
1006            })
1007            .await
1008            .unwrap();
1009
1010        // Send a dummy partial update to ensure the updater has processed prior partials.
1011        sender
1012            .send(ConfigUpdate::Partial {
1013                key: "dummy".to_string(),
1014                value: serde_json::json!(1),
1015            })
1016            .await
1017            .unwrap();
1018
1019        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1020            loop {
1021                match rx.recv().await {
1022                    Ok(ev) if ev.key == "dummy" => break,
1023                    Err(e) => panic!("updates channel closed: {e}"),
1024                    Ok(_) => continue,
1025                }
1026            }
1027        })
1028        .await
1029        .expect("timed out waiting for sync marker");
1030
1031        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1032    }
1033
1034    #[tokio::test]
1035    async fn test_dynamic_configuration_add_new_nested_key() {
1036        let (cfg, sender) = ConfigurationLoader::for_tests(
1037            Some(serde_json::json!({
1038                "foo": "bar",
1039                "baz": 5,
1040                "foobar": { "a": false, "b": "c" }
1041            })),
1042            None,
1043            true,
1044        )
1045        .await;
1046        let sender = sender.expect("sender should exist");
1047
1048        sender
1049            .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1050            .await
1051            .unwrap();
1052        cfg.ready().await;
1053
1054        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1055
1056        sender
1057            .send(ConfigUpdate::Partial {
1058                key: "new_parent.new_child".to_string(),
1059                value: serde_json::json!(42),
1060            })
1061            .await
1062            .unwrap();
1063
1064        // new_parent object did not exist before, so the diff will emit the object "new_parent"
1065        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1066            loop {
1067                match rx.recv().await {
1068                    Ok(ev) if ev.key == "new_parent" => break ev,
1069                    Err(e) => panic!("updates channel closed: {e}"),
1070                    Ok(_) => continue,
1071                }
1072            }
1073        })
1074        .await
1075        .expect("timed out waiting for new_parent.new_child update");
1076
1077        assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1078    }
1079
1080    #[tokio::test]
1081    async fn test_underscore_fallback_on_get() {
1082        let (cfg, _) = ConfigurationLoader::for_tests(
1083            Some(serde_json::json!({})),
1084            Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1085            false,
1086        )
1087        .await;
1088        cfg.ready().await;
1089
1090        assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1091    }
1092
1093    #[tokio::test]
1094    async fn test_static_configuration_ready_and_subscribe() {
1095        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1096        assert!(maybe_sender.is_none());
1097
1098        tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1099            .await
1100            .expect("ready() should not block when dynamic is disabled");
1101
1102        assert!(cfg.subscribe_for_updates().is_none());
1103    }
1104
1105    #[tokio::test]
1106    async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1107        // Enable dynamic but do not send the initial snapshot.
1108        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1109        assert!(maybe_sender.is_some());
1110
1111        // ready() should not resolve until the initial snapshot is processed.
1112        let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1113        assert!(res.is_err(), "ready() should time out without an initial snapshot");
1114    }
1115}