Skip to main content

saluki_env/workload/
on_demand_pid.rs

1#[cfg(target_os = "linux")]
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4#[cfg(target_os = "linux")]
5use std::time::Duration;
6
7#[cfg(target_os = "linux")]
8use saluki_common::cache::{Cache, CacheBuilder};
9use saluki_config::GenericConfiguration;
10use saluki_error::GenericError;
11use saluki_metrics::static_metrics;
12use stringtheory::interning::GenericMapInterner;
13#[cfg(target_os = "linux")]
14use tokio::time::sleep;
15#[cfg(target_os = "linux")]
16use tracing::{debug, trace};
17
18#[cfg(target_os = "linux")]
19use crate::workload::helpers::cgroups::{CgroupsConfiguration, CgroupsReader};
20use crate::{features::FeatureDetector, workload::EntityId};
21
22static_metrics! {
23    name => Telemetry,
24    prefix => pid_resolver,
25    metrics => [
26        gauge(interner_capacity_bytes),
27        gauge(interner_len_bytes),
28        gauge(interner_entries),
29    ],
30}
31
32#[cfg(target_os = "linux")]
33type PIDCache = Cache<u32, EntityId>;
34#[cfg(target_os = "linux")]
35const DEFAULT_PID_CACHE_CACHED_PIDS_LIMIT: usize = 500_000;
36#[cfg(target_os = "linux")]
37const DEFAULT_PID_CACHE_IDLE_PID_EXPIRATION: Duration = Duration::from_secs(30);
38
39#[allow(clippy::large_enum_variant)]
40enum Inner {
41    #[allow(dead_code)]
42    Noop,
43
44    #[cfg(target_os = "linux")]
45    Linux {
46        cgroups_reader: CgroupsReader,
47        pid_mappings_cache: PIDCache,
48    },
49}
50
51impl Inner {
52    fn resolve(&self, process_id: u32) -> Option<EntityId> {
53        match self {
54            Inner::Noop => resolve_noop_pid(process_id),
55
56            #[cfg(target_os = "linux")]
57            Inner::Linux {
58                pid_mappings_cache,
59                cgroups_reader,
60            } => resolve_linux_pid(process_id, pid_mappings_cache, cgroups_reader),
61        }
62    }
63}
64
65/// A resolver for mapping process IDs to their container IDs based on querying the underlying host.
66///
67/// # Platform support
68///
69/// On Linux platforms, PIDs are resolved by querying procfs to find the cgroup of the process, if one exists, the cgroup
70/// hierarchy is queried to discover the container ID that owns the process, if possible.
71///
72/// On all other platforms, `OnDemandPIDResolver` is a no-op and doesn't perform any resolution.
73#[derive(Clone)]
74pub struct OnDemandPIDResolver {
75    inner: Arc<Inner>,
76}
77
78impl OnDemandPIDResolver {
79    /// Creates a new `OnDemandPIDResolver` from the given configuration.
80    #[cfg(not(target_os = "linux"))]
81    pub fn from_configuration(
82        _config: &GenericConfiguration, _feature_detector: FeatureDetector, _interner: GenericMapInterner,
83    ) -> Result<Self, GenericError> {
84        // On non-Linux platforms, we don't need to do anything special.
85        Ok(Self {
86            inner: Arc::new(Inner::Noop),
87        })
88    }
89
90    /// Creates a new `OnDemandPIDResolver` from the given configuration.
91    ///
92    /// ## Errors
93    ///
94    /// If a cgroups hierarchy can't be found, or the internal cache can't be created, an error is returned.
95    #[cfg(target_os = "linux")]
96    pub fn from_configuration(
97        config: &GenericConfiguration, feature_detector: FeatureDetector, interner: GenericMapInterner,
98    ) -> Result<Self, GenericError> {
99        use stringtheory::interning::Interner as _;
100
101        let telemetry = Telemetry::new();
102        telemetry
103            .interner_capacity_bytes()
104            .set(interner.capacity_bytes() as f64);
105
106        let cgroups_config = CgroupsConfiguration::from_configuration(config, feature_detector)?;
107        let cgroups_reader = match CgroupsReader::try_from_config(&cgroups_config, interner.clone())? {
108            Some(reader) => reader,
109            None => {
110                return Err(GenericError::msg("Failed to detect any cgroups v1/v2 hierarchy."));
111            }
112        };
113
114        let cache_builder = CacheBuilder::from_identifier("on_demand_pid_resolver")?
115            .with_capacity(NonZeroUsize::new(DEFAULT_PID_CACHE_CACHED_PIDS_LIMIT).unwrap())
116            .with_time_to_idle(Some(DEFAULT_PID_CACHE_IDLE_PID_EXPIRATION));
117
118        let inner = Arc::new(Inner::Linux {
119            cgroups_reader,
120            pid_mappings_cache: cache_builder.build(),
121        });
122
123        tokio::spawn(drive_telemetry(interner.clone(), telemetry.clone()));
124
125        Ok(Self { inner })
126    }
127
128    /// Resolves a process ID to the container ID of the container is part of.
129    ///
130    /// If the process ID isn't part of a container, or can't be found, `None` is returned.
131    pub fn resolve(&self, process_id: u32) -> Option<EntityId> {
132        self.inner.resolve(process_id)
133    }
134}
135
136#[cfg(target_os = "linux")]
137async fn drive_telemetry(interner: GenericMapInterner, telemetry: Telemetry) {
138    use stringtheory::interning::Interner as _;
139
140    loop {
141        sleep(Duration::from_secs(1)).await;
142
143        telemetry.interner_entries().set(interner.len() as f64);
144        telemetry
145            .interner_capacity_bytes()
146            .set(interner.capacity_bytes() as f64);
147        telemetry.interner_len_bytes().set(interner.len_bytes() as f64);
148    }
149}
150
151fn resolve_noop_pid(_process_id: u32) -> Option<EntityId> {
152    // No-op resolver, always returns None.
153    None
154}
155
156#[cfg(target_os = "linux")]
157fn resolve_linux_pid(
158    process_id: u32, pid_mappings_cache: &PIDCache, cgroups_reader: &CgroupsReader,
159) -> Option<EntityId> {
160    // First, check our PID mapping cache.
161    if let Some(container_id) = pid_mappings_cache.get(&process_id) {
162        trace!(
163            "Resolved PID {} to container ID {} from cache.",
164            process_id,
165            container_id
166        );
167        return Some(container_id);
168    }
169
170    // If we don't have a mapping, query the host OS for it.
171    match cgroups_reader.get_cgroup_by_pid(process_id) {
172        Some(cgroup) => {
173            let container_eid = EntityId::Container(cgroup.into_container_id());
174
175            debug!("Resolved PID {} to container ID {}.", process_id, container_eid);
176
177            pid_mappings_cache.insert(process_id, container_eid.clone());
178            Some(container_eid)
179        }
180        None => {
181            debug!(
182                "Failed to resolve container ID for PID {}. Process ID may not be part of a container.",
183                process_id
184            );
185            None
186        }
187    }
188}