saluki_env/workload/providers/remote_agent/
mod.rs

1//! A workload provider based on the Datadog Agent's remote tagger and workloadmeta APIs.
2
3use std::{future::Future, num::NonZeroUsize};
4
5use memory_accounting::{ComponentRegistry, MemoryBounds, MemoryBoundsBuilder};
6use saluki_config::GenericConfiguration;
7use saluki_context::{
8    origin::{OriginTagCardinality, RawOrigin},
9    tags::SharedTagSet,
10};
11use saluki_error::{generic_error, GenericError};
12use saluki_health::{Health, HealthRegistry};
13use stringtheory::interning::GenericMapInterner;
14
15#[cfg(target_os = "linux")]
16use crate::workload::collectors::CgroupsMetadataCollector;
17use crate::{
18    features::{Feature, FeatureDetector},
19    workload::{
20        aggregator::MetadataAggregator,
21        collectors::{
22            ContainerdMetadataCollector, RemoteAgentTaggerMetadataCollector, RemoteAgentWorkloadMetadataCollector,
23        },
24        entity::EntityId,
25        on_demand_pid::OnDemandPIDResolver,
26        origin::{OriginResolver, ResolvedOrigin},
27        stores::{ExternalDataStore, ExternalDataStoreResolver, TagStore, TagStoreQuerier},
28    },
29    WorkloadProvider,
30};
31
32mod api;
33pub use self::api::RemoteAgentWorkloadAPIHandler;
34
35// TODO: Make these configurable.
36
37// SAFETY: The value is demonstrably not zero.
38const DEFAULT_TAG_STORE_ENTITY_LIMIT: NonZeroUsize = NonZeroUsize::new(2000).unwrap();
39
40// SAFETY: The value is demonstrably not zero.
41const DEFAULT_EXTERNAL_DATA_STORE_ENTITY_LIMIT: NonZeroUsize = NonZeroUsize::new(2000).unwrap();
42
43// SAFETY: We know the value is not zero.
44const DEFAULT_STRING_INTERNER_SIZE_BYTES: NonZeroUsize = NonZeroUsize::new(512 * 1024).unwrap(); // 512KB.
45
46/// Datadog Agent-based workload provider.
47///
48/// This provider is based primarily on the remote tagger API exposed by the Datadog Agent, which handles the bulk of
49/// the work by collecting and aggregating tags for container entities. This remote tagger API operates in a streaming
50/// fashion, which the provider uses to stream update operations to the tag store.
51///
52/// Additionally, two collectors are optionally used: a `containerd` collector and a `cgroups-v2` collector. The
53/// `containerd` collector will, if containerd is running, be used to collect metadata that allows mapping container
54/// PIDs (UDS-based Origin Detection) to container IDs. The `cgroups-v2` collector will collect metadata about the
55/// current set of cgroups v2 controllers, tracking any controllers which appear related to containers and storing a
56/// mapping of controller inodes to container IDs.
57///
58/// These additional collectors are necessary to bridge the gap from container PID and cgroup controller inode, as the
59/// remote tagger API does not stream us these mappings itself and only deals with resolved container IDs.
60#[derive(Clone)]
61pub struct RemoteAgentWorkloadProvider {
62    tags_querier: TagStoreQuerier,
63    origin_resolver: OriginResolver,
64    on_demand_pid_resolver: OnDemandPIDResolver,
65    eds_resolver: ExternalDataStoreResolver,
66}
67
68impl RemoteAgentWorkloadProvider {
69    /// Create a new `RemoteAgentWorkloadProvider` based on the given configuration.
70    pub async fn from_configuration(
71        config: &GenericConfiguration, component_registry: ComponentRegistry, health_registry: &HealthRegistry,
72    ) -> Result<Self, GenericError> {
73        let mut component_registry = component_registry.get_or_create("remote-agent");
74        let mut provider_bounds = component_registry.bounds_builder();
75
76        // Create our string interner which will get used primarily for tags, but also for any other long-ish lived strings.
77        let string_interner_size_bytes = config
78            .try_get_typed::<NonZeroUsize>("remote_agent_string_interner_size_bytes")?
79            .unwrap_or(DEFAULT_STRING_INTERNER_SIZE_BYTES);
80        let string_interner = GenericMapInterner::new(string_interner_size_bytes);
81
82        provider_bounds
83            .subcomponent("string_interner")
84            .firm()
85            .with_fixed_amount("string interner", string_interner_size_bytes.get());
86
87        // Construct our aggregator, and add any collectors based on the detected features we've been given.
88        let aggregator_health = health_registry
89            .register_component("env_provider.workload.remote_agent.aggregator")
90            .ok_or_else(|| {
91                generic_error!(
92                    "Component 'env_provider.workload.remote_agent.aggregator' already registered in health registry."
93                )
94            })?;
95        let mut aggregator = MetadataAggregator::new(aggregator_health);
96
97        let mut collector_bounds = provider_bounds.subcomponent("collectors");
98
99        // Add the containerd collector if the feature is available.
100        let feature_detector = FeatureDetector::automatic(config);
101        if feature_detector.is_feature_available(Feature::Containerd) {
102            let cri_collector = build_collector("containerd", health_registry, &mut collector_bounds, |health| {
103                ContainerdMetadataCollector::from_configuration(config, health, string_interner.clone())
104            })
105            .await?;
106
107            aggregator.add_collector(cri_collector);
108        }
109
110        // Add the cgroups collector if the feature if we're on Linux.
111        #[cfg(target_os = "linux")]
112        {
113            let cgroups_collector = build_collector("cgroups", health_registry, &mut collector_bounds, |health| {
114                CgroupsMetadataCollector::from_configuration(
115                    config,
116                    feature_detector.clone(),
117                    health,
118                    string_interner.clone(),
119                )
120            })
121            .await?;
122
123            aggregator.add_collector(cgroups_collector);
124        }
125
126        // Finally, add the Remote Agent collectors: one for the tagger, and one for workloadmeta.
127        let ra_tags_collector =
128            build_collector("remote-agent-tags", health_registry, &mut collector_bounds, |health| {
129                RemoteAgentTaggerMetadataCollector::from_configuration(config, health, string_interner.clone())
130            })
131            .await?;
132
133        aggregator.add_collector(ra_tags_collector);
134
135        let ra_wmeta_collector =
136            build_collector("remote-agent-wmeta", health_registry, &mut collector_bounds, |health| {
137                RemoteAgentWorkloadMetadataCollector::from_configuration(config, health, string_interner.clone())
138            })
139            .await?;
140
141        aggregator.add_collector(ra_wmeta_collector);
142
143        // Create and attach the various metadata stores.
144        let tag_store = TagStore::with_entity_limit(DEFAULT_TAG_STORE_ENTITY_LIMIT);
145        let tags_querier = tag_store.querier();
146
147        aggregator.add_store(tag_store);
148
149        let external_data_store = ExternalDataStore::with_entity_limit(DEFAULT_EXTERNAL_DATA_STORE_ENTITY_LIMIT);
150        let eds_resolver = external_data_store.resolver();
151
152        aggregator.add_store(external_data_store);
153
154        let on_demand_pid_resolver =
155            OnDemandPIDResolver::from_configuration(config, feature_detector, string_interner)?;
156        let origin_resolver = OriginResolver::new(eds_resolver.clone());
157
158        // With the aggregator configured, update the memory bounds and spawn the aggregator.
159        provider_bounds.with_subcomponent("aggregator", &aggregator);
160
161        tokio::spawn(aggregator.run());
162
163        Ok(Self {
164            tags_querier,
165            origin_resolver,
166            on_demand_pid_resolver,
167            eds_resolver,
168        })
169    }
170
171    /// Returns an API handler for dumping the contents of the underlying data stores.
172    ///
173    /// This handler exposes routes for querying the state of the workload provider. See
174    /// [`RemoteAgentWorkloadAPIHandler`] for more information about routes and responses.
175    pub fn api_handler(&self) -> RemoteAgentWorkloadAPIHandler {
176        RemoteAgentWorkloadAPIHandler::from_state(self.tags_querier.clone(), self.eds_resolver.clone())
177    }
178}
179
180impl WorkloadProvider for RemoteAgentWorkloadProvider {
181    fn get_tags_for_entity(&self, entity_id: &EntityId, cardinality: OriginTagCardinality) -> Option<SharedTagSet> {
182        // Query the tag store for the tags associated with the given entity ID.
183        match self.tags_querier.get_entity_tags(entity_id, cardinality) {
184            Some(tags) => Some(tags),
185            None => {
186                // If no tags came back, check if the entity ID is a PID. If it is, we can try to resolve it to a
187                // container ID first before trying again.
188                if let EntityId::ContainerPid(pid) = entity_id {
189                    if let Some(container_id) = self.on_demand_pid_resolver.resolve(*pid) {
190                        // If we successfully resolved the PID to a container ID, try again.
191                        return self.tags_querier.get_entity_tags(&container_id, cardinality);
192                    }
193                }
194
195                None
196            }
197        }
198    }
199
200    fn get_resolved_origin(&self, origin: RawOrigin<'_>) -> Option<ResolvedOrigin> {
201        self.origin_resolver.get_resolved_origin(origin)
202    }
203}
204
205async fn build_collector<F, Fut, O>(
206    collector_name: &str, health_registry: &HealthRegistry, bounds_builder: &mut MemoryBoundsBuilder<'_>, build: F,
207) -> Result<O, GenericError>
208where
209    F: FnOnce(Health) -> Fut,
210    Fut: Future<Output = Result<O, GenericError>>,
211    O: MemoryBounds,
212{
213    let health = health_registry
214        .register_component(format!(
215            "env_provider.workload.remote_agent.collector.{}",
216            collector_name
217        ))
218        .ok_or_else(|| {
219            generic_error!(
220                "Component 'env_provider.workload.remote_agent.collector.{}' already registered in health registry.",
221                collector_name
222            )
223        })?;
224    let collector = build(health).await?;
225    bounds_builder.with_subcomponent(collector_name, &collector);
226
227    Ok(collector)
228}