saluki_env/workload/
on_demand_pid.rs1#[cfg(target_os = "linux")]
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4#[cfg(target_os = "linux")]
5use std::time::Duration;
6
7#[cfg(target_os = "linux")]
8use saluki_common::cache::{Cache, CacheBuilder};
9use saluki_config::GenericConfiguration;
10use saluki_error::GenericError;
11use saluki_metrics::static_metrics;
12use stringtheory::interning::GenericMapInterner;
13#[cfg(target_os = "linux")]
14use tokio::time::sleep;
15#[cfg(target_os = "linux")]
16use tracing::{debug, trace};
17
18#[cfg(target_os = "linux")]
19use crate::workload::helpers::cgroups::{CgroupsConfiguration, CgroupsReader};
20use crate::{features::FeatureDetector, workload::EntityId};
21
22static_metrics! {
23 name => Telemetry,
24 prefix => pid_resolver,
25 metrics => [
26 gauge(interner_capacity_bytes),
27 gauge(interner_len_bytes),
28 gauge(interner_entries),
29 ],
30}
31
32#[cfg(target_os = "linux")]
33type PIDCache = Cache<u32, EntityId>;
34#[cfg(target_os = "linux")]
35const DEFAULT_PID_CACHE_CACHED_PIDS_LIMIT: usize = 500_000;
36#[cfg(target_os = "linux")]
37const DEFAULT_PID_CACHE_IDLE_PID_EXPIRATION: Duration = Duration::from_secs(30);
38
39#[allow(clippy::large_enum_variant)]
40enum Inner {
41 #[allow(dead_code)]
42 Noop,
43
44 #[cfg(target_os = "linux")]
45 Linux {
46 cgroups_reader: CgroupsReader,
47 pid_mappings_cache: PIDCache,
48 },
49}
50
51impl Inner {
52 fn resolve(&self, process_id: u32) -> Option<EntityId> {
53 match self {
54 Inner::Noop => resolve_noop_pid(process_id),
55
56 #[cfg(target_os = "linux")]
57 Inner::Linux {
58 pid_mappings_cache,
59 cgroups_reader,
60 } => resolve_linux_pid(process_id, pid_mappings_cache, cgroups_reader),
61 }
62 }
63}
64
65#[derive(Clone)]
74pub struct OnDemandPIDResolver {
75 inner: Arc<Inner>,
76}
77
78impl OnDemandPIDResolver {
79 #[cfg(not(target_os = "linux"))]
81 pub fn from_configuration(
82 _config: &GenericConfiguration, _feature_detector: FeatureDetector, _interner: GenericMapInterner,
83 ) -> Result<Self, GenericError> {
84 Ok(Self {
86 inner: Arc::new(Inner::Noop),
87 })
88 }
89
90 #[cfg(target_os = "linux")]
96 pub fn from_configuration(
97 config: &GenericConfiguration, feature_detector: FeatureDetector, interner: GenericMapInterner,
98 ) -> Result<Self, GenericError> {
99 use stringtheory::interning::Interner as _;
100
101 let telemetry = Telemetry::new();
102 telemetry
103 .interner_capacity_bytes()
104 .set(interner.capacity_bytes() as f64);
105
106 let cgroups_config = CgroupsConfiguration::from_configuration(config, feature_detector)?;
107 let cgroups_reader = match CgroupsReader::try_from_config(&cgroups_config, interner.clone())? {
108 Some(reader) => reader,
109 None => {
110 return Err(GenericError::msg("Failed to detect any cgroups v1/v2 hierarchy."));
111 }
112 };
113
114 let cache_builder = CacheBuilder::from_identifier("on_demand_pid_resolver")?
115 .with_capacity(NonZeroUsize::new(DEFAULT_PID_CACHE_CACHED_PIDS_LIMIT).unwrap())
116 .with_time_to_idle(Some(DEFAULT_PID_CACHE_IDLE_PID_EXPIRATION));
117
118 let inner = Arc::new(Inner::Linux {
119 cgroups_reader,
120 pid_mappings_cache: cache_builder.build(),
121 });
122
123 tokio::spawn(drive_telemetry(interner.clone(), telemetry.clone()));
124
125 Ok(Self { inner })
126 }
127
128 pub fn resolve(&self, process_id: u32) -> Option<EntityId> {
132 self.inner.resolve(process_id)
133 }
134}
135
136#[cfg(target_os = "linux")]
137async fn drive_telemetry(interner: GenericMapInterner, telemetry: Telemetry) {
138 use stringtheory::interning::Interner as _;
139
140 loop {
141 sleep(Duration::from_secs(1)).await;
142
143 telemetry.interner_entries().set(interner.len() as f64);
144 telemetry
145 .interner_capacity_bytes()
146 .set(interner.capacity_bytes() as f64);
147 telemetry.interner_len_bytes().set(interner.len_bytes() as f64);
148 }
149}
150
151fn resolve_noop_pid(_process_id: u32) -> Option<EntityId> {
152 None
154}
155
156#[cfg(target_os = "linux")]
157fn resolve_linux_pid(
158 process_id: u32, pid_mappings_cache: &PIDCache, cgroups_reader: &CgroupsReader,
159) -> Option<EntityId> {
160 if let Some(container_id) = pid_mappings_cache.get(&process_id) {
162 trace!(
163 "Resolved PID {} to container ID {} from cache.",
164 process_id,
165 container_id
166 );
167 return Some(container_id);
168 }
169
170 match cgroups_reader.get_cgroup_by_pid(process_id) {
172 Some(cgroup) => {
173 let container_eid = EntityId::Container(cgroup.into_container_id());
174
175 debug!("Resolved PID {} to container ID {}.", process_id, container_eid);
176
177 pid_mappings_cache.insert(process_id, container_eid.clone());
178 Some(container_eid)
179 }
180 None => {
181 debug!(
182 "Failed to resolve container ID for PID {}. Process ID may not be part of a container.",
183 process_id
184 );
185 None
186 }
187 }
188}