saluki_env/workload/providers/remote_agent/
mod.rs1use std::{future::Future, num::NonZeroUsize};
4
5use memory_accounting::{ComponentRegistry, MemoryBounds, MemoryBoundsBuilder};
6use saluki_config::GenericConfiguration;
7use saluki_context::{
8 origin::{OriginTagCardinality, RawOrigin},
9 tags::SharedTagSet,
10};
11use saluki_error::{generic_error, GenericError};
12use saluki_health::{Health, HealthRegistry};
13use stringtheory::interning::GenericMapInterner;
14
15#[cfg(target_os = "linux")]
16use crate::workload::collectors::CgroupsMetadataCollector;
17use crate::{
18 features::{Feature, FeatureDetector},
19 workload::{
20 aggregator::MetadataAggregator,
21 collectors::{
22 ContainerdMetadataCollector, RemoteAgentTaggerMetadataCollector, RemoteAgentWorkloadMetadataCollector,
23 },
24 entity::EntityId,
25 on_demand_pid::OnDemandPIDResolver,
26 origin::{OriginResolver, ResolvedOrigin},
27 stores::{ExternalDataStore, ExternalDataStoreResolver, TagStore, TagStoreQuerier},
28 },
29 WorkloadProvider,
30};
31
32mod api;
33pub use self::api::RemoteAgentWorkloadAPIHandler;
34
35const DEFAULT_TAG_STORE_ENTITY_LIMIT: NonZeroUsize = NonZeroUsize::new(2000).unwrap();
39
40const DEFAULT_EXTERNAL_DATA_STORE_ENTITY_LIMIT: NonZeroUsize = NonZeroUsize::new(2000).unwrap();
42
43const DEFAULT_STRING_INTERNER_SIZE_BYTES: NonZeroUsize = NonZeroUsize::new(512 * 1024).unwrap(); #[derive(Clone)]
61pub struct RemoteAgentWorkloadProvider {
62 tags_querier: TagStoreQuerier,
63 origin_resolver: OriginResolver,
64 on_demand_pid_resolver: OnDemandPIDResolver,
65 eds_resolver: ExternalDataStoreResolver,
66}
67
68impl RemoteAgentWorkloadProvider {
69 pub async fn from_configuration(
71 config: &GenericConfiguration, component_registry: ComponentRegistry, health_registry: &HealthRegistry,
72 ) -> Result<Self, GenericError> {
73 let mut component_registry = component_registry.get_or_create("remote-agent");
74 let mut provider_bounds = component_registry.bounds_builder();
75
76 let string_interner_size_bytes = config
78 .try_get_typed::<NonZeroUsize>("remote_agent_string_interner_size_bytes")?
79 .unwrap_or(DEFAULT_STRING_INTERNER_SIZE_BYTES);
80 let string_interner = GenericMapInterner::new(string_interner_size_bytes);
81
82 provider_bounds
83 .subcomponent("string_interner")
84 .firm()
85 .with_fixed_amount("string interner", string_interner_size_bytes.get());
86
87 let aggregator_health = health_registry
89 .register_component("env_provider.workload.remote_agent.aggregator")
90 .ok_or_else(|| {
91 generic_error!(
92 "Component 'env_provider.workload.remote_agent.aggregator' already registered in health registry."
93 )
94 })?;
95 let mut aggregator = MetadataAggregator::new(aggregator_health);
96
97 let mut collector_bounds = provider_bounds.subcomponent("collectors");
98
99 let feature_detector = FeatureDetector::automatic(config);
101 if feature_detector.is_feature_available(Feature::Containerd) {
102 let cri_collector = build_collector("containerd", health_registry, &mut collector_bounds, |health| {
103 ContainerdMetadataCollector::from_configuration(config, health, string_interner.clone())
104 })
105 .await?;
106
107 aggregator.add_collector(cri_collector);
108 }
109
110 #[cfg(target_os = "linux")]
112 {
113 let cgroups_collector = build_collector("cgroups", health_registry, &mut collector_bounds, |health| {
114 CgroupsMetadataCollector::from_configuration(
115 config,
116 feature_detector.clone(),
117 health,
118 string_interner.clone(),
119 )
120 })
121 .await?;
122
123 aggregator.add_collector(cgroups_collector);
124 }
125
126 let ra_tags_collector =
128 build_collector("remote-agent-tags", health_registry, &mut collector_bounds, |health| {
129 RemoteAgentTaggerMetadataCollector::from_configuration(config, health, string_interner.clone())
130 })
131 .await?;
132
133 aggregator.add_collector(ra_tags_collector);
134
135 let ra_wmeta_collector =
136 build_collector("remote-agent-wmeta", health_registry, &mut collector_bounds, |health| {
137 RemoteAgentWorkloadMetadataCollector::from_configuration(config, health, string_interner.clone())
138 })
139 .await?;
140
141 aggregator.add_collector(ra_wmeta_collector);
142
143 let tag_store = TagStore::with_entity_limit(DEFAULT_TAG_STORE_ENTITY_LIMIT);
145 let tags_querier = tag_store.querier();
146
147 aggregator.add_store(tag_store);
148
149 let external_data_store = ExternalDataStore::with_entity_limit(DEFAULT_EXTERNAL_DATA_STORE_ENTITY_LIMIT);
150 let eds_resolver = external_data_store.resolver();
151
152 aggregator.add_store(external_data_store);
153
154 let on_demand_pid_resolver =
155 OnDemandPIDResolver::from_configuration(config, feature_detector, string_interner)?;
156 let origin_resolver = OriginResolver::new(eds_resolver.clone());
157
158 provider_bounds.with_subcomponent("aggregator", &aggregator);
160
161 tokio::spawn(aggregator.run());
162
163 Ok(Self {
164 tags_querier,
165 origin_resolver,
166 on_demand_pid_resolver,
167 eds_resolver,
168 })
169 }
170
171 pub fn api_handler(&self) -> RemoteAgentWorkloadAPIHandler {
176 RemoteAgentWorkloadAPIHandler::from_state(self.tags_querier.clone(), self.eds_resolver.clone())
177 }
178}
179
180impl WorkloadProvider for RemoteAgentWorkloadProvider {
181 fn get_tags_for_entity(&self, entity_id: &EntityId, cardinality: OriginTagCardinality) -> Option<SharedTagSet> {
182 match self.tags_querier.get_entity_tags(entity_id, cardinality) {
184 Some(tags) => Some(tags),
185 None => {
186 if let EntityId::ContainerPid(pid) = entity_id {
189 if let Some(container_id) = self.on_demand_pid_resolver.resolve(*pid) {
190 return self.tags_querier.get_entity_tags(&container_id, cardinality);
192 }
193 }
194
195 None
196 }
197 }
198 }
199
200 fn get_resolved_origin(&self, origin: RawOrigin<'_>) -> Option<ResolvedOrigin> {
201 self.origin_resolver.get_resolved_origin(origin)
202 }
203}
204
205async fn build_collector<F, Fut, O>(
206 collector_name: &str, health_registry: &HealthRegistry, bounds_builder: &mut MemoryBoundsBuilder<'_>, build: F,
207) -> Result<O, GenericError>
208where
209 F: FnOnce(Health) -> Fut,
210 Fut: Future<Output = Result<O, GenericError>>,
211 O: MemoryBounds,
212{
213 let health = health_registry
214 .register_component(format!(
215 "env_provider.workload.remote_agent.collector.{}",
216 collector_name
217 ))
218 .ok_or_else(|| {
219 generic_error!(
220 "Component 'env_provider.workload.remote_agent.collector.{}' already registered in health registry.",
221 collector_name
222 )
223 })?;
224 let collector = build(health).await?;
225 bounds_builder.with_subcomponent(collector_name, &collector);
226
227 Ok(collector)
228}