Skip to main content

saluki_config/
lib.rs

1//! Primitives for working with typed and untyped configuration data.
2#![deny(warnings)]
3#![deny(missing_docs)]
4
5use std::sync::{Arc, OnceLock, RwLock};
6use std::{borrow::Cow, collections::HashSet};
7
8pub use figment::value;
9use figment::{
10    error::Kind,
11    providers::{Env, Serialized},
12    Figment, Provider,
13};
14use saluki_error::GenericError;
15use serde::Deserialize;
16use snafu::{ResultExt as _, Snafu};
17use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
18use tracing::{debug, error};
19
20pub mod dynamic;
21mod provider;
22mod secrets;
23
24pub use self::dynamic::FieldUpdateWatcher;
25use self::dynamic::{ConfigChangeEvent, ConfigUpdate};
26use self::provider::ResolvedProvider;
27
28#[derive(Clone)]
29struct ArcProvider(Arc<dyn Provider + Send + Sync>);
30
31impl Provider for ArcProvider {
32    fn metadata(&self) -> figment::Metadata {
33        self.0.metadata()
34    }
35
36    fn data(&self) -> Result<figment::value::Map<figment::Profile, figment::value::Dict>, figment::Error> {
37        self.0.data()
38    }
39}
40
41enum ProviderSource {
42    Static(ArcProvider),
43    Dynamic(Option<mpsc::Receiver<ConfigUpdate>>),
44}
45
46impl Clone for ProviderSource {
47    fn clone(&self) -> Self {
48        match self {
49            Self::Static(p) => Self::Static(p.clone()),
50            Self::Dynamic(_) => Self::Dynamic(None),
51        }
52    }
53}
54
55/// A configuration error.
56#[derive(Debug, Snafu)]
57#[snafu(context(suffix(false)))]
58pub enum ConfigurationError {
59    /// Environment variable prefix was empty.
60    #[snafu(display("Environment variable prefix must not be empty."))]
61    EmptyPrefix,
62
63    /// Requested field was missing from the configuration.
64    #[snafu(display("Missing field '{}' in configuration. {}", field, help_text))]
65    MissingField {
66        /// Help text describing how to set the missing field.
67        ///
68        /// This is meant to be displayed to the user, and includes environment variable-specific text if environment
69        /// variables had been loaded originally.
70        help_text: String,
71
72        /// Name of the missing field.
73        field: Cow<'static, str>,
74    },
75
76    /// Requested field's data type was not the unexpected data type.
77    #[snafu(display(
78        "Expected value for field '{}' to be '{}', got '{}' instead.",
79        field,
80        expected_ty,
81        actual_ty
82    ))]
83    InvalidFieldType {
84        /// Name of the invalid field.
85        ///
86        /// This is a period-separated path to the field.
87        field: String,
88
89        /// Expected data type.
90        expected_ty: String,
91
92        /// Actual data type.
93        actual_ty: String,
94    },
95
96    /// Generic configuration error.
97    #[snafu(transparent)]
98    Generic {
99        /// Error source.
100        source: GenericError,
101    },
102
103    /// Secrets resolution error.
104    #[snafu(display("Failed to resolve secrets."))]
105    Secrets {
106        /// Error source.
107        source: secrets::Error,
108    },
109}
110
111impl From<figment::Error> for ConfigurationError {
112    fn from(e: figment::Error) -> Self {
113        match e.kind {
114            Kind::InvalidType(actual_ty, expected_ty) => Self::InvalidFieldType {
115                field: e.path.join("."),
116                expected_ty,
117                actual_ty: actual_ty.to_string(),
118            },
119            _ => Self::Generic { source: e.into() },
120        }
121    }
122}
123
124#[derive(Clone, Debug, Eq, Hash, PartialEq)]
125enum LookupSource {
126    /// The configuration key is looked up in a form suitable for environment variables.
127    Environment { prefix: String },
128}
129
130impl LookupSource {
131    fn transform_key(&self, key: &str) -> String {
132        match self {
133            // The prefix should already be uppercased, with a trailing underscore, which is needed when we actually
134            // configure the provider used for reading from the environment... so we don't need to re-do that here.
135            LookupSource::Environment { prefix } => format!("{}{}", prefix, key.replace('.', "_").to_uppercase()),
136        }
137    }
138}
139
140/// A configuration loader that can pull from various sources.
141///
142/// This loader provides a wrapper around a lower-level library, `figment`, to expose a simpler and focused API for both
143/// loading configuration data from various sources, as well as querying it.
144///
145/// A variety of configuration sources can be configured (see below), with an implicit priority based on the order in
146/// which sources are added: sources added later take precedence over sources prior. Additionally, either a typed value
147/// can be extracted from the configuration ([`into_typed`][Self::into_typed]), or the raw configuration data can be
148/// accessed via a generic API ([`into_generic`][Self::into_generic]).
149///
150/// # Supported sources
151///
152/// - YAML file
153/// - JSON file
154/// - environment variables (must be prefixed; see [`from_environment`][Self::from_environment])
155#[derive(Clone, Default)]
156pub struct ConfigurationLoader {
157    key_aliases: &'static [(&'static str, &'static str)],
158    lookup_sources: HashSet<LookupSource>,
159    provider_sources: Vec<ProviderSource>,
160}
161
162impl ConfigurationLoader {
163    /// Sets key aliases to apply when loading file-based configuration sources.
164    ///
165    /// Each entry is `(nested_path, flat_key)`. When a YAML or JSON file contains a value at `nested_path`
166    /// (dot-separated), that value is also emitted under `flat_key` at the top level — but only if `flat_key`
167    /// is not already explicitly set at the top level. This ensures that both YAML nested format and flat env var
168    /// format produce the same Figment key, so source precedence (env vars > file) works correctly.
169    ///
170    /// Must be called before any file-loading methods ([`from_yaml`][Self::from_yaml], etc.) to take effect.
171    pub fn with_key_aliases(mut self, aliases: &'static [(&'static str, &'static str)]) -> Self {
172        self.key_aliases = aliases;
173        self
174    }
175
176    /// Appends one or more providers to the configuration chain.
177    ///
178    /// Sources are merged in the order they are added: later sources take precedence over earlier ones. Call
179    /// this method after any file-loading methods and before [`from_environment`][Self::from_environment] to
180    /// place the added providers at the correct intermediate precedence level:
181    ///
182    /// ```text
183    /// file providers  <  add_providers(...)  <  from_environment(...)
184    /// ```
185    pub fn add_providers<P, I>(mut self, providers: I) -> Self
186    where
187        P: Provider + Send + Sync + 'static,
188        I: IntoIterator<Item = P>,
189    {
190        for p in providers {
191            self.provider_sources
192                .push(ProviderSource::Static(ArcProvider(Arc::new(p))));
193        }
194        self
195    }
196
197    /// Loads the given YAML configuration file.
198    ///
199    /// # Errors
200    ///
201    /// If the file could not be read, or if the file is not valid YAML, an error will be returned.
202    pub fn from_yaml<P>(mut self, path: P) -> Result<Self, ConfigurationError>
203    where
204        P: AsRef<std::path::Path>,
205    {
206        let resolved_provider = ResolvedProvider::from_yaml(&path, self.key_aliases)?;
207        self.provider_sources
208            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
209        Ok(self)
210    }
211
212    /// Attempts to load the given YAML configuration file, ignoring any errors.
213    ///
214    /// Errors include the file not existing, not being readable/accessible, and not being valid YAML.
215    pub fn try_from_yaml<P>(mut self, path: P) -> Self
216    where
217        P: AsRef<std::path::Path>,
218    {
219        match ResolvedProvider::from_yaml(&path, self.key_aliases) {
220            Ok(resolved_provider) => {
221                self.provider_sources
222                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
223            }
224            Err(e) => {
225                println!(
226                    "Unable to read YAML configuration file '{}': {}. Ignoring.",
227                    path.as_ref().to_string_lossy(),
228                    e
229                );
230            }
231        }
232        self
233    }
234
235    /// Loads the given JSON configuration file.
236    ///
237    /// # Errors
238    ///
239    /// If the file could not be read, or if the file is not valid JSON, an error will be returned.
240    pub fn from_json<P>(mut self, path: P) -> Result<Self, ConfigurationError>
241    where
242        P: AsRef<std::path::Path>,
243    {
244        let resolved_provider = ResolvedProvider::from_json(&path, self.key_aliases)?;
245        self.provider_sources
246            .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
247        Ok(self)
248    }
249
250    /// Attempts to load the given JSON configuration file, ignoring any errors.
251    ///
252    /// Errors include the file not existing, not being readable/accessible, and not being valid JSON.
253    pub fn try_from_json<P>(mut self, path: P) -> Self
254    where
255        P: AsRef<std::path::Path>,
256    {
257        match ResolvedProvider::from_json(&path, self.key_aliases) {
258            Ok(resolved_provider) => {
259                self.provider_sources
260                    .push(ProviderSource::Static(ArcProvider(Arc::new(resolved_provider))));
261            }
262            Err(e) => {
263                println!(
264                    "Unable to read JSON configuration file '{}': {}. Ignoring.",
265                    path.as_ref().to_string_lossy(),
266                    e
267                );
268            }
269        }
270        self
271    }
272
273    /// Loads configuration from environment variables.
274    ///
275    /// The prefix given will have an underscore appended to it if it does not already end with one. For
276    /// example, with a prefix of `app`, any environment variable starting with `app_` would be matched. The
277    /// prefix is case-insensitive.
278    ///
279    /// Sources are merged in the order they are added, with later sources taking precedence over earlier ones.
280    /// Sources added after this call will have higher precedence than environment variables.
281    ///
282    /// # Errors
283    ///
284    /// If the prefix is empty, an error will be returned.
285    pub fn from_environment(mut self, prefix: &'static str) -> Result<Self, ConfigurationError> {
286        if prefix.is_empty() {
287            return Err(ConfigurationError::EmptyPrefix);
288        }
289
290        let prefix = if prefix.ends_with('_') {
291            prefix.to_string()
292        } else {
293            format!("{}_", prefix)
294        };
295
296        // Convert to use Serialized::defaults since, Env isn't Send + Sync
297        let env = Env::prefixed(&prefix).split("__");
298        let values = env.data().unwrap();
299        if let Some(default_dict) = values.get(&figment::Profile::Default) {
300            self.provider_sources
301                .push(ProviderSource::Static(ArcProvider(Arc::new(Serialized::defaults(
302                    default_dict.clone(),
303                )))));
304            self.lookup_sources.insert(LookupSource::Environment { prefix });
305        }
306        Ok(self)
307    }
308
309    /// Resolves secrets in the configuration based on available secret backend configuration.
310    ///
311    /// This will attempt to resolve any secret references (format shown below) in the configuration by using a "secrets
312    /// backend", which is a user-provided command that utilizes a simple JSON-based protocol to accept secrets to
313    /// resolve, and return those resolved secrets, or the errors that occurred during resolving.
314    ///
315    /// # Configuration
316    ///
317    /// This method uses the existing configuration (see Caveats) to determine the secrets backend configuration. The
318    /// following configuration settings are used:
319    ///
320    /// - `secret_backend_command`: The executable to resolve secrets. (required)
321    /// - `secret_backend_timeout`: The timeout for the secrets backend command, in seconds. (optional, default: 30)
322    ///
323    /// # Usage
324    ///
325    /// For any value which should be resolved as a secret, the value should be a string in the format of
326    /// `ENC[secret_reference]`. The `secret_reference` portion is the value that will be sent to the backend command
327    /// during resolution. There is no limitation on the format of the `secret_reference` value, so long as it can be
328    /// expressed through the existing configuration sources (YAML, environment variables, etc).
329    ///
330    /// The entire configuration value must match this pattern, and cannot be used to replace only part of a value, so
331    /// values such as `db-ENC[secret_reference]` would not be detected as secrets and thus would not be resolved.
332    ///
333    /// # Protocol
334    ///
335    /// The executable is expected to accept a JSON object on stdin, with the following format:
336    ///
337    /// ```json
338    /// {
339    ///   "version": "1.0",
340    ///   "secrets": ["key1", "key2"]
341    /// }
342    /// ```
343    ///
344    /// The executable is expected return a JSON object on stdout, with the following format:
345    /// ```json
346    /// {
347    ///   "key1": {
348    ///     "value": "my_secret_password",
349    ///     "error": null
350    ///   },
351    ///   "key2": {
352    ///     "value": null,
353    ///     "error": "could not fetch the secret"
354    ///   }
355    /// }
356    /// ```
357    ///
358    /// If any entry in the response has an `error` value that is anything but `null`, the overall resolution will be
359    /// considered failed.
360    ///
361    /// # Caveats
362    ///
363    /// ## Time of resolution
364    ///
365    /// Secrets resolution happens at the time this method is called, and only resolves configuration values that are
366    /// already present in the configuration, which means all calls to load configuration (`try_from_yaml`,
367    /// `from_environment`, etc) must be made before calling this method.
368    ///
369    /// ## Sensitive data in error output
370    ///
371    /// Care should be taken to not return sensitive information in either the error output (standard error) of the
372    /// backend command or the `error` field in the JSON response, as these values are logged in order to aid debugging.
373    pub async fn with_default_secrets_resolution(mut self) -> Result<Self, ConfigurationError> {
374        let configuration = build_figment_from_sources(&self.provider_sources);
375
376        // If no secrets backend is set, we can't resolve secrets, so just return early.
377        if !has_valid_secret_backend_command(&configuration) {
378            debug!("No secrets backend configured; skipping secrets resolution.");
379            return Ok(self);
380        }
381
382        let resolver_config = configuration.extract::<secrets::resolver::ExternalProcessResolverConfiguration>()?;
383        let resolver = secrets::resolver::ExternalProcessResolver::from_configuration(resolver_config)
384            .await
385            .context(Secrets)?;
386
387        let provider = secrets::Provider::new(resolver, &configuration)
388            .await
389            .context(Secrets)?;
390
391        self.provider_sources
392            .push(ProviderSource::Static(ArcProvider(Arc::new(provider))));
393        Ok(self)
394    }
395
396    /// Enables dynamic configuration.
397    ///
398    /// The receiver is used in `run_dynamic_config_updater` to handle retrieving the initial snapshot and subsequent updates.
399    pub fn with_dynamic_configuration(mut self, receiver: mpsc::Receiver<ConfigUpdate>) -> Self {
400        self.provider_sources.push(ProviderSource::Dynamic(Some(receiver)));
401        self
402    }
403
404    /// Consumes the configuration loader, deserializing it as `T`.
405    ///
406    /// ## Errors
407    ///
408    /// If the configuration could not be deserialized into `T`, an error will be returned.
409    pub fn into_typed<'a, T>(self) -> Result<T, ConfigurationError>
410    where
411        T: Deserialize<'a>,
412    {
413        let figment = build_figment_from_sources(&self.provider_sources);
414        figment.extract().map_err(Into::into)
415    }
416
417    /// Creates a bootstrap `GenericConfiguration` without consuming the loader.
418    ///
419    /// This creates a static snapshot of the configuration loaded so far. As this is intended for bootstrapping
420    /// before dynamic configuration is active, the dynamic provider is ignored.
421    pub fn bootstrap_generic(&self) -> GenericConfiguration {
422        let figment = build_figment_from_sources(&self.provider_sources);
423
424        GenericConfiguration {
425            inner: Arc::new(Inner {
426                figment: RwLock::new(figment),
427                lookup_sources: self.lookup_sources.clone(),
428                event_sender: None,
429                ready_signal: Mutex::new(None),
430            }),
431        }
432    }
433
434    /// Consumes the configuration loader and wraps it in a generic wrapper.
435    pub async fn into_generic(mut self) -> Result<GenericConfiguration, ConfigurationError> {
436        let has_dynamic_provider = self
437            .provider_sources
438            .iter()
439            .any(|s| matches!(s, ProviderSource::Dynamic(_)));
440
441        if has_dynamic_provider {
442            let mut receiver_opt = None;
443            for source in self.provider_sources.iter_mut() {
444                if let ProviderSource::Dynamic(ref mut receiver) = source {
445                    receiver_opt = receiver.take();
446                    break;
447                }
448            }
449            let receiver = receiver_opt.expect("Dynamic receiver should exist but was not found");
450
451            // Build the initial figment object from the static providers. The dynamic provider is empty for now.
452            let figment = build_figment_from_sources(&self.provider_sources);
453
454            let (event_sender, _) = broadcast::channel(100);
455            let (ready_sender, ready_receiver) = oneshot::channel();
456
457            let generic_config = GenericConfiguration {
458                inner: Arc::new(Inner {
459                    figment: RwLock::new(figment),
460                    lookup_sources: self.lookup_sources,
461                    event_sender: Some(event_sender.clone()),
462                    ready_signal: Mutex::new(Some(ready_receiver)),
463                }),
464            };
465
466            // Spawn the background task to handle retrieving the initial snapshot and subsequent updates.
467            tokio::spawn(run_dynamic_config_updater(
468                generic_config.inner.clone(),
469                receiver,
470                self.provider_sources,
471                event_sender,
472                ready_sender,
473            ));
474
475            Ok(generic_config)
476        } else {
477            // Otherwise, just build the static configuration.
478            let figment = build_figment_from_sources(&self.provider_sources);
479
480            Ok(GenericConfiguration {
481                inner: Arc::new(Inner {
482                    figment: RwLock::new(figment),
483                    lookup_sources: self.lookup_sources,
484                    event_sender: None,
485                    ready_signal: Mutex::new(None),
486                }),
487            })
488        }
489    }
490
491    /// Configures a [`GenericConfiguration`] that is suitable for tests.
492    ///
493    /// This configures the loader with the following defaults:
494    ///
495    /// - configuration from a JSON file
496    /// - configuration from environment variables
497    ///
498    /// If `enable_dynamic_configuration` is true, a dynamic configuration sender is returned.
499    ///
500    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate testing scenarios.
501    pub async fn for_tests(
502        file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
503        enable_dynamic_configuration: bool,
504    ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>) {
505        Self::for_tests_with_provider_factory(file_values, env_vars, enable_dynamic_configuration, &[], || {
506            Serialized::defaults(serde_json::json!({}))
507        })
508        .await
509    }
510
511    /// Like [`for_tests`][Self::for_tests], but applies `key_aliases` during file loading and calls
512    /// `provider_factory` to build an additional provider inserted between the file provider and the
513    /// environment provider.
514    ///
515    /// The factory is called after test environment variables have been set, so any env var reads it performs
516    /// (e.g. in `DatadogRemapper`) are consistent with the test's env setup.
517    ///
518    /// This is generally only useful for testing purposes, and is exposed publicly in order to be used in cross-crate testing scenarios.
519    pub async fn for_tests_with_provider_factory<P, F>(
520        file_values: Option<serde_json::Value>, env_vars: Option<&[(String, String)]>,
521        enable_dynamic_configuration: bool, key_aliases: &'static [(&'static str, &'static str)], provider_factory: F,
522    ) -> (GenericConfiguration, Option<tokio::sync::mpsc::Sender<ConfigUpdate>>)
523    where
524        P: Provider + Send + Sync + 'static,
525        F: FnOnce() -> P,
526    {
527        let json_file = tempfile::NamedTempFile::new().expect("should not fail to create temp file.");
528        let path = &json_file.path();
529        let json_to_write = file_values.unwrap_or(serde_json::json!({}));
530        serde_json::to_writer(&json_file, &json_to_write).expect("should not fail to write to temp file.");
531
532        let mut loader = ConfigurationLoader::default()
533            .with_key_aliases(key_aliases)
534            .try_from_json(path);
535        let mut maybe_sender = None;
536        if enable_dynamic_configuration {
537            let (sender, receiver) = tokio::sync::mpsc::channel(1);
538            loader = loader.with_dynamic_configuration(receiver);
539            maybe_sender = Some(sender);
540        }
541
542        static ENV_MUTEX: OnceLock<std::sync::Mutex<()>> = OnceLock::new();
543
544        let guard = ENV_MUTEX.get_or_init(|| std::sync::Mutex::new(())).lock().unwrap();
545
546        if let Some(pairs) = env_vars.as_ref() {
547            for (k, v) in pairs.iter() {
548                // Set under both the raw name and the TEST_ prefix:
549                //   - Raw name: available to any env-reading providers (e.g. DatadogRemapper)
550                //   - TEST_ prefix: read by from_environment("TEST") (simulates DD_ prefix)
551                std::env::set_var(k, v);
552                std::env::set_var(format!("TEST_{}", k), v);
553            }
554        }
555
556        // Build and insert the extra provider while env vars are set so it can snapshot them.
557        let loader = loader.add_providers([provider_factory()]);
558
559        // Add environment provider last so it has the highest precedence.
560        let loader = loader
561            .from_environment("TEST")
562            .expect("should not fail to add environment provider");
563
564        // Clean up test-provided env vars now that all providers have been built.
565        if let Some(pairs) = env_vars.as_ref() {
566            for (k, _) in pairs.iter() {
567                std::env::remove_var(k);
568                std::env::remove_var(format!("TEST_{}", k));
569            }
570        }
571
572        drop(guard);
573
574        let cfg = loader
575            .into_generic()
576            .await
577            .expect("should not fail to build generic configuration");
578
579        (cfg, maybe_sender)
580    }
581}
582
583fn build_figment_from_sources(sources: &[ProviderSource]) -> Figment {
584    sources.iter().fold(Figment::new(), |figment, source| match source {
585        ProviderSource::Static(p) => figment.admerge(p.clone()),
586        // No-op. The merging is handled by the updater task.
587        ProviderSource::Dynamic(_) => figment,
588    })
589}
590
591/// Inserts or updates a value for a key.
592///
593/// Intermediate objects are created if they don't exist.
594pub fn upsert(root: &mut serde_json::Value, key: &str, value: serde_json::Value) {
595    if !root.is_object() {
596        *root = serde_json::Value::Object(serde_json::Map::new());
597    }
598
599    let mut current = root;
600    // Create a new node for each segment if the key is dotted.
601    let mut segments = key.split('.').peekable();
602
603    while let Some(seg) = segments.next() {
604        let is_leaf = segments.peek().is_none();
605
606        // Ensure current is an object before operating
607        if !current.is_object() {
608            *current = serde_json::Value::Object(serde_json::Map::new());
609        }
610        let node = current.as_object_mut().expect("current node should be an object");
611
612        if is_leaf {
613            node.insert(seg.to_string(), value);
614            break;
615        } else {
616            // Ensure child exists and is an object
617            let should_create_node = match node.get(seg) {
618                Some(v) => !v.is_object(),
619                None => true,
620            };
621            // Check if we need to create an intermediate node if it doesn't exist.
622            if should_create_node {
623                node.insert(seg.to_string(), serde_json::Value::Object(serde_json::Map::new()));
624            }
625
626            // Advance the current node to the next level.
627            current = node.get_mut(seg).expect("should not fail to get nested object");
628        }
629    }
630}
631
632async fn run_dynamic_config_updater(
633    inner: Arc<Inner>, mut receiver: mpsc::Receiver<ConfigUpdate>, provider_sources: Vec<ProviderSource>,
634    sender: broadcast::Sender<ConfigChangeEvent>, ready_sender: oneshot::Sender<()>,
635) {
636    // The first message on the channel will be the initial snapshot.
637    let initial_update = match receiver.recv().await {
638        Some(update) => update,
639        None => {
640            // The channel was closed before we even received the initial snapshot.
641            debug!("Dynamic configuration channel closed before initial snapshot.");
642            return;
643        }
644    };
645
646    let mut dynamic_state = match initial_update {
647        ConfigUpdate::Snapshot(state) => state,
648        ConfigUpdate::Partial { .. } => {
649            // This is theoretically unreachable, as `configstream` should always send a snapshot first.
650            error!("First dynamic config message was not a snapshot. Updater may be in an inconsistent state.");
651            serde_json::Value::Null
652        }
653    };
654
655    // Rebuild the configuration with the initial snapshot.
656    let new_figment = provider_sources
657        .iter()
658        .fold(Figment::new(), |figment, source| match source {
659            ProviderSource::Static(p) => figment.admerge(p.clone()),
660            ProviderSource::Dynamic(_) => {
661                figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
662            }
663        });
664
665    // Update the main figment object and then release the lock.
666    {
667        let mut figment_guard = inner.figment.write().unwrap();
668        *figment_guard = new_figment.clone();
669    }
670
671    // Signal that the initial snapshot has been processed and the configuration is ready.
672    if ready_sender.send(()).is_err() {
673        debug!("Configuration readiness receiver dropped. Updater task shutting down.");
674        return;
675    }
676
677    // Set our "current" state for the main loop.
678    let mut current_config: figment::value::Value = new_figment.extract().unwrap();
679
680    // Enter the main loop to process subsequent updates.
681    loop {
682        let update = match receiver.recv().await {
683            Some(update) => update,
684            None => {
685                // The sender was dropped, which means the config stream has terminated. We can exit.
686                debug!("Dynamic configuration update channel closed. Updater task shutting down.");
687                return;
688            }
689        };
690
691        // Update our local dynamic state based on the received message.
692        match update {
693            ConfigUpdate::Snapshot(new_state) => {
694                debug!("Received configuration snapshot update.");
695                dynamic_state = new_state;
696            }
697            ConfigUpdate::Partial { key, value } => {
698                debug!(%key, "Received partial configuration update.");
699                if dynamic_state.is_null() {
700                    dynamic_state = serde_json::Value::Object(serde_json::Map::new());
701                }
702                if dynamic_state.is_object() {
703                    upsert(&mut dynamic_state, &key, value);
704                } else {
705                    error!(
706                        "Received partial update but current dynamic state is not an object. This should not happen."
707                    );
708                }
709            }
710        }
711
712        // Rebuild the figment object on every update, respecting the original provider order.
713        let new_figment = provider_sources
714            .iter()
715            .fold(Figment::new(), |figment, source| match source {
716                ProviderSource::Static(p) => figment.admerge(p.clone()),
717                ProviderSource::Dynamic(_) => {
718                    figment.admerge(figment::providers::Serialized::defaults(dynamic_state.clone()))
719                }
720            });
721
722        let new_config: figment::value::Value = new_figment.clone().extract().unwrap();
723
724        if current_config != new_config {
725            for change in dynamic::diff_config(&current_config, &new_config) {
726                // Send the change event to any receivers of the dynamic handler.
727                // If there are no receivers, `send` will fail. This is expected and fine,
728                // so we can ignore the error to avoid log spam.
729                let _ = sender.send(change);
730            }
731
732            let mut figment_guard = inner.figment.write().unwrap_or_else(|e| {
733                error!("Failed to acquire write lock for dynamic configuration: {}", e);
734                e.into_inner()
735            });
736            *figment_guard = new_figment;
737
738            // Update our "current" state for the next iteration.
739            current_config = new_config;
740        }
741    }
742}
743
744#[derive(Debug)]
745struct Inner {
746    figment: RwLock<Figment>,
747    lookup_sources: HashSet<LookupSource>,
748    event_sender: Option<broadcast::Sender<ConfigChangeEvent>>,
749    ready_signal: Mutex<Option<oneshot::Receiver<()>>>,
750}
751
752/// A generic configuration object.
753///
754/// This represents the merged configuration derived from [`ConfigurationLoader`] in its raw form.  Values can be
755/// queried by key, and can be extracted either as typed values or in their raw form.
756///
757/// Keys must be in the form of `a.b.c`, where periods (`.`) as used to indicate a nested value.
758///
759/// Using an example JSON configuration:
760///
761/// ```json
762/// {
763///   "a": {
764///     "b": {
765///       "c": "value"
766///     }
767///   }
768/// }
769/// ```
770///
771/// Querying for the value of `a.b.c` would return `"value"`, and querying for `a.b` would return the nested object `{
772/// "c": "value" }`.
773#[derive(Clone, Debug)]
774pub struct GenericConfiguration {
775    inner: Arc<Inner>,
776}
777
778impl GenericConfiguration {
779    /// Waits for the configuration to be ready, if dynamic configuration is enabled.
780    ///
781    /// If dynamic configuration is in use, this method will asynchronously wait until the first snapshot has been
782    /// received and applied.
783    ///
784    /// If dynamic configuration is not used, it returns immediately.
785    pub async fn ready(&self) {
786        // We need a lock to both ensure that multiple callers can race against this,
787        // and to allow us mutable access to consume the receiver.
788        let mut maybe_ready_rx = self.inner.ready_signal.lock().await;
789        if let Some(ready_rx) = maybe_ready_rx.take() {
790            // We're the first caller to wait for readiness.
791            if ready_rx.await.is_err() {
792                error!("Failed to receive configuration readiness signal; updater task may have panicked.");
793            }
794        }
795    }
796
797    fn get<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
798    where
799        T: Deserialize<'a>,
800    {
801        let figment_guard = self.inner.figment.read().unwrap();
802        match figment_guard.extract_inner(key) {
803            Ok(value) => Ok(value),
804            Err(e) => {
805                if matches!(e.kind, figment::error::Kind::MissingField(_)) {
806                    // We might have been given a key that uses nested notation -- `foo.bar` -- but is only present in the
807                    // environment variables. We specifically don't want to use a different separator in environment
808                    // variables to map to nested key separators, so we simply try again here but with all nested key
809                    // separators (`.`) replaced with `_`, to match environment variables.
810                    let fallback_key = key.replace('.', "_");
811                    figment_guard
812                        .extract_inner(&fallback_key)
813                        .map_err(|fallback_e| from_figment_error(&self.inner.lookup_sources, fallback_e))
814                } else {
815                    Err(e.into())
816                }
817            }
818        }
819    }
820
821    /// Gets a configuration value by key.
822    ///
823    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
824    ///
825    /// ## Errors
826    ///
827    /// If the key does not exist in the configuration, or if the value could not be deserialized into `T`, an error
828    /// variant will be returned.
829    pub fn get_typed<'a, T>(&self, key: &str) -> Result<T, ConfigurationError>
830    where
831        T: Deserialize<'a>,
832    {
833        self.get(key)
834    }
835
836    /// Gets a configuration value by key, or the default value if a key does not exist or could not be deserialized.
837    ///
838    /// The `Default` implementation of `T` will be used both if the key could not be found, as well as for any error
839    /// during deserialization. This effectively swallows any errors and should generally be used sparingly.
840    ///
841    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
842    pub fn get_typed_or_default<'a, T>(&self, key: &str) -> T
843    where
844        T: Default + Deserialize<'a>,
845    {
846        self.get(key).unwrap_or_default()
847    }
848
849    /// Gets a configuration value by key, if it exists.
850    ///
851    /// If the key exists in the configuration, and can be deserialized, `Ok(Some(value))` is returned. Otherwise,
852    /// `Ok(None)` will be returned.
853    ///
854    /// The key must be in the form of `a.b.c`, where periods (`.`) are used to indicate a nested lookup.
855    ///
856    /// ## Errors
857    ///
858    /// If the value could not be deserialized into `T`, an error will be returned.
859    pub fn try_get_typed<'a, T>(&self, key: &str) -> Result<Option<T>, ConfigurationError>
860    where
861        T: Deserialize<'a>,
862    {
863        match self.get(key) {
864            Ok(value) => Ok(Some(value)),
865            Err(ConfigurationError::MissingField { .. }) => Ok(None),
866            Err(e) => Err(e),
867        }
868    }
869
870    /// Attempts to deserialize the entire configuration as `T`.
871    ///
872    /// ## Errors
873    ///
874    /// If the value could not be deserialized into `T`, an error will be returned.
875    pub fn as_typed<'a, T>(&self) -> Result<T, ConfigurationError>
876    where
877        T: Deserialize<'a>,
878    {
879        self.inner
880            .figment
881            .read()
882            .unwrap()
883            .extract()
884            .map_err(|e| from_figment_error(&self.inner.lookup_sources, e))
885    }
886
887    /// Subscribes for updates to the configuration.
888    pub fn subscribe_for_updates(&self) -> Option<broadcast::Receiver<dynamic::ConfigChangeEvent>> {
889        self.inner.event_sender.as_ref().map(|s| s.subscribe())
890    }
891
892    /// Creates a watcher that yields only when the given key changes.
893    ///
894    /// If dynamic configuration is disabled, the returned watcher's `changed()`
895    /// will wait indefinitely.
896    pub fn watch_for_updates(&self, key: &str) -> FieldUpdateWatcher {
897        FieldUpdateWatcher {
898            key: key.to_string(),
899            rx: self.subscribe_for_updates(),
900        }
901    }
902}
903
904fn from_figment_error(lookup_sources: &HashSet<LookupSource>, e: figment::Error) -> ConfigurationError {
905    match e.kind {
906        Kind::MissingField(field) => {
907            let mut valid_keys = lookup_sources
908                .iter()
909                .map(|source| source.transform_key(&field))
910                .collect::<Vec<_>>();
911
912            // Always specify the original key as a valid key to try.
913            valid_keys.insert(0, field.to_string());
914
915            let help_text = format!("Try setting `{}`.", valid_keys.join("` or `"));
916
917            ConfigurationError::MissingField { help_text, field }
918        }
919        Kind::InvalidType(actual_ty, expected_ty) => ConfigurationError::InvalidFieldType {
920            field: e.path.join("."),
921            expected_ty,
922            actual_ty: actual_ty.to_string(),
923        },
924        _ => ConfigurationError::Generic { source: e.into() },
925    }
926}
927
928fn has_valid_secret_backend_command(configuration: &Figment) -> bool {
929    configuration
930        .find_value("secret_backend_command")
931        .ok()
932        .is_some_and(|v| v.as_str().filter(|s| !s.is_empty()).is_some())
933}
934
935#[cfg(test)]
936mod tests {
937    use super::*;
938
939    macro_rules! json_to_figment {
940        ($json:tt) => {
941            Figment::from(Serialized::defaults(serde_json::json!($json)))
942        };
943    }
944
945    #[test]
946    fn test_has_valid_secret_backend_command() {
947        // When `secrets_backend_command` is not set at all, or is set to an empty string, or isn't even a string
948        // value... then we should consider those scenarios as "secrets backend not configured".
949        let figment = Figment::new();
950        assert!(!has_valid_secret_backend_command(&figment));
951
952        let figment = json_to_figment!({
953            "secret_backend_command": ""
954        });
955        assert!(!has_valid_secret_backend_command(&figment));
956
957        let figment = json_to_figment!({
958            "secret_backend_command": false
959        });
960        assert!(!has_valid_secret_backend_command(&figment));
961
962        // Otherwise, whether it's a valid path or not, then we should consider things enabled, which means we'll
963        // at least attempt secrets resolution:
964        let figment = json_to_figment!({
965            "secret_backend_command": "/usr/bin/foo"
966        });
967        assert!(has_valid_secret_backend_command(&figment));
968
969        let figment = json_to_figment!({
970            "secret_backend_command": "or anything else"
971        });
972        assert!(has_valid_secret_backend_command(&figment));
973    }
974
975    #[tokio::test]
976    async fn test_static_configuration() {
977        let (cfg, _) = ConfigurationLoader::for_tests(
978            Some(serde_json::json!({
979                "foo": "bar",
980                "baz": 5,
981                "foobar": { "a": false, "b": "c" }
982            })),
983            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
984            false,
985        )
986        .await;
987        cfg.ready().await;
988
989        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
990        assert_eq!(cfg.get_typed::<i32>("baz").unwrap(), 5);
991        assert!(!cfg.get_typed::<bool>("foobar.a").unwrap());
992        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
993        assert!(matches!(
994            cfg.get::<String>("nonexistentKey"),
995            Err(ConfigurationError::MissingField { .. })
996        ));
997    }
998
999    #[tokio::test]
1000    async fn test_dynamic_configuration() {
1001        let (cfg, sender) = ConfigurationLoader::for_tests(
1002            Some(serde_json::json!({
1003                "foo": "bar",
1004                "baz": 5,
1005                "foobar": { "a": false, "b": "c" }
1006            })),
1007            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
1008            true,
1009        )
1010        .await;
1011        let sender = sender.expect("sender should exist");
1012        sender
1013            .send(ConfigUpdate::Snapshot(serde_json::json!({
1014                "new": "from_snapshot",
1015            })))
1016            .await
1017            .unwrap();
1018
1019        cfg.ready().await;
1020
1021        // Test that existing values still exist.
1022        assert_eq!(cfg.get_typed::<String>("foo").unwrap(), "bar");
1023
1024        // Test that new values from the snapshot exist.
1025        assert_eq!(cfg.get_typed::<String>("new").unwrap(), "from_snapshot");
1026
1027        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1028
1029        sender
1030            .send(ConfigUpdate::Partial {
1031                key: "new_key".to_string(),
1032                value: "from dynamic update".to_string().into(),
1033            })
1034            .await
1035            .unwrap();
1036
1037        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1038            loop {
1039                match rx.recv().await {
1040                    Ok(ev) if ev.key == "new_key" => break ev,
1041                    Err(e) => panic!("updates channel closed: {e}"),
1042                    Ok(_) => continue,
1043                }
1044            }
1045        })
1046        .await
1047        .expect("timed out waiting for new_key update");
1048
1049        assert_eq!(cfg.get_typed::<String>("new_key").unwrap(), "from dynamic update");
1050
1051        // Test that an update with a nested key is applied.
1052        sender
1053            .send(ConfigUpdate::Partial {
1054                key: "foobar.a".to_string(),
1055                value: serde_json::json!(true),
1056            })
1057            .await
1058            .unwrap();
1059
1060        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1061            loop {
1062                match rx.recv().await {
1063                    Ok(ev) if ev.key == "foobar.a" => break ev,
1064                    Err(e) => panic!("updates channel closed: {e}"),
1065                    Ok(_) => continue,
1066                }
1067            }
1068        })
1069        .await
1070        .expect("timed out waiting for foobar.a update");
1071
1072        assert!(cfg.get_typed::<bool>("foobar.a").unwrap());
1073        assert_eq!(cfg.get_typed::<String>("foobar.b").unwrap(), "c");
1074    }
1075
1076    #[tokio::test]
1077    async fn test_environment_precedence_over_dynamic() {
1078        let (cfg, sender) = ConfigurationLoader::for_tests(
1079            Some(serde_json::json!({
1080                "foo": "bar",
1081                "baz": 5,
1082                "foobar": { "a": false, "b": "c" }
1083            })),
1084            Some(&[("ENV_VAR".to_string(), "from_env".to_string())]),
1085            true,
1086        )
1087        .await;
1088        let sender = sender.expect("sender should exist");
1089
1090        sender
1091            .send(ConfigUpdate::Snapshot(serde_json::json!({
1092                "env_var": "from_snapshot_env_var"
1093            })))
1094            .await
1095            .unwrap();
1096
1097        cfg.ready().await;
1098
1099        // Env provider has highest precedence so the snapshot should not override it.
1100        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1101
1102        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1103
1104        // Send a partial update that attempts to override the env-backed key.
1105        sender
1106            .send(ConfigUpdate::Partial {
1107                key: "env_var".to_string(),
1108                value: serde_json::json!("from_partial"),
1109            })
1110            .await
1111            .unwrap();
1112
1113        // Also attempt to override the nested env-backed key via dynamic.
1114        sender
1115            .send(ConfigUpdate::Partial {
1116                key: "foobar.a".to_string(),
1117                value: serde_json::json!(false),
1118            })
1119            .await
1120            .unwrap();
1121
1122        // Send a dummy partial update to ensure the updater has processed prior partials.
1123        sender
1124            .send(ConfigUpdate::Partial {
1125                key: "dummy".to_string(),
1126                value: serde_json::json!(1),
1127            })
1128            .await
1129            .unwrap();
1130
1131        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1132            loop {
1133                match rx.recv().await {
1134                    Ok(ev) if ev.key == "dummy" => break,
1135                    Err(e) => panic!("updates channel closed: {e}"),
1136                    Ok(_) => continue,
1137                }
1138            }
1139        })
1140        .await
1141        .expect("timed out waiting for sync marker");
1142
1143        assert_eq!(cfg.get_typed::<String>("env_var").unwrap(), "from_env");
1144    }
1145
1146    #[tokio::test]
1147    async fn test_dynamic_configuration_add_new_nested_key() {
1148        let (cfg, sender) = ConfigurationLoader::for_tests(
1149            Some(serde_json::json!({
1150                "foo": "bar",
1151                "baz": 5,
1152                "foobar": { "a": false, "b": "c" }
1153            })),
1154            None,
1155            true,
1156        )
1157        .await;
1158        let sender = sender.expect("sender should exist");
1159
1160        sender
1161            .send(ConfigUpdate::Snapshot(serde_json::json!({})))
1162            .await
1163            .unwrap();
1164        cfg.ready().await;
1165
1166        let mut rx = cfg.subscribe_for_updates().expect("dynamic updates should be enabled");
1167
1168        sender
1169            .send(ConfigUpdate::Partial {
1170                key: "new_parent.new_child".to_string(),
1171                value: serde_json::json!(42),
1172            })
1173            .await
1174            .unwrap();
1175
1176        // new_parent object did not exist before, so the diff will emit the object "new_parent"
1177        tokio::time::timeout(std::time::Duration::from_secs(2), async {
1178            loop {
1179                match rx.recv().await {
1180                    Ok(ev) if ev.key == "new_parent" => break ev,
1181                    Err(e) => panic!("updates channel closed: {e}"),
1182                    Ok(_) => continue,
1183                }
1184            }
1185        })
1186        .await
1187        .expect("timed out waiting for new_parent.new_child update");
1188
1189        assert_eq!(cfg.get_typed::<i32>("new_parent.new_child").unwrap(), 42);
1190    }
1191
1192    #[tokio::test]
1193    async fn test_underscore_fallback_on_get() {
1194        let (cfg, _) = ConfigurationLoader::for_tests(
1195            Some(serde_json::json!({})),
1196            Some(&[("RANDOM_KEY".to_string(), "from_env_only".to_string())]),
1197            false,
1198        )
1199        .await;
1200        cfg.ready().await;
1201
1202        assert_eq!(cfg.get_typed::<String>("random.key").unwrap(), "from_env_only");
1203    }
1204
1205    #[tokio::test]
1206    async fn test_static_configuration_ready_and_subscribe() {
1207        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, false).await;
1208        assert!(maybe_sender.is_none());
1209
1210        tokio::time::timeout(std::time::Duration::from_millis(500), cfg.ready())
1211            .await
1212            .expect("ready() should not block when dynamic is disabled");
1213
1214        assert!(cfg.subscribe_for_updates().is_none());
1215    }
1216
1217    #[tokio::test]
1218    async fn test_dynamic_configuration_ready_requires_initial_snapshot() {
1219        // Enable dynamic but do not send the initial snapshot.
1220        let (cfg, maybe_sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await;
1221        assert!(maybe_sender.is_some());
1222
1223        // ready() should not resolve until the initial snapshot is processed.
1224        let res = tokio::time::timeout(std::time::Duration::from_millis(1000), cfg.ready()).await;
1225        assert!(res.is_err(), "ready() should time out without an initial snapshot");
1226    }
1227}