saluki_env/workload/collectors/
cgroups.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::{
6 collections::{FastHashMap, FastHashSet},
7 sync::shutdown::ShutdownHandle,
8};
9use saluki_config::GenericConfiguration;
10use saluki_core::health::Health;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use stringtheory::{interning::GenericMapInterner, MetaString};
13use tokio::{pin, select, sync::mpsc};
14use tracing::{debug, warn};
15
16use super::MetadataCollector;
17use crate::{
18 features::FeatureDetector,
19 workload::{
20 entity::EntityId,
21 helpers::cgroups::{CgroupsConfiguration, CgroupsReader},
22 metadata::MetadataOperation,
23 },
24};
25
26pub struct CgroupsMetadataCollector {
36 reader: CgroupsReader,
37 health: Health,
38}
39
40impl CgroupsMetadataCollector {
41 pub async fn from_configuration(
47 config: &GenericConfiguration, feature_detector: FeatureDetector, health: Health, interner: GenericMapInterner,
48 ) -> Result<Self, GenericError> {
49 let cgroups_config = CgroupsConfiguration::from_configuration(config, feature_detector)?;
50 let reader = match CgroupsReader::try_from_config(&cgroups_config, interner)? {
51 Some(reader) => reader,
52 None => {
53 return Err(generic_error!("Failed to detect any cgroups v1/v2 hierarchy. "));
54 }
55 };
56
57 Ok(Self { reader, health })
58 }
59}
60
61#[async_trait]
62impl MetadataCollector for CgroupsMetadataCollector {
63 fn name(&self) -> &'static str {
64 "cgroups"
65 }
66
67 async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
68 self.health.mark_ready();
69
70 let mut cgroups_manager = SynchronousCgroupsManager::from_reader(self.reader.clone());
74 let operations_tx = operations_tx.clone();
75
76 let (_shutdown_coordinator, shutdown_handle) = ShutdownHandle::paired();
79 let poller_handle = tokio::task::spawn_blocking(move || cgroups_manager.poll(operations_tx, shutdown_handle));
80 pin!(poller_handle);
81
82 debug!("Spawned cgroups background poller task.");
83
84 let final_result = loop {
85 select! {
86 _ = self.health.live() => {},
87 result = &mut poller_handle => match result {
88 Ok(Ok(())) => break Ok(()),
89 Ok(Err(e)) => break Err(e).error_context("Cgroups background poller task encountered an error."),
90 Err(e) => break Err(e).error_context("Cgroups background poller task panicked."),
91 }
92 }
93 };
94
95 self.health.mark_not_ready();
96
97 final_result
98 }
99}
100
101impl MemoryBounds for CgroupsMetadataCollector {
102 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
103 builder
104 .minimum()
105 .with_array::<MetadataOperation>("metadata operations", 64);
107 builder.firm().with_single_value::<Self>("component struct");
114 }
115}
116
117struct SynchronousCgroupsManager {
118 reader: CgroupsReader,
119 active_cgroups: FastHashMap<u64, MetaString>,
120 operations: Vec<MetadataOperation>,
121}
122
123impl SynchronousCgroupsManager {
124 fn from_reader(reader: CgroupsReader) -> Self {
125 Self {
126 reader,
127 active_cgroups: FastHashMap::default(),
128 operations: Vec::with_capacity(64),
129 }
130 }
131
132 fn poll(
133 &mut self, operations_tx: mpsc::Sender<MetadataOperation>, shutdown_handle: ShutdownHandle,
134 ) -> Result<(), GenericError> {
135 let mut traversed_cgroups = FastHashSet::default();
136 let mut cgroups_to_delete = Vec::new();
137
138 loop {
139 if operations_tx.is_closed() || shutdown_handle.is_triggered() {
141 return Ok(());
142 }
143
144 traversed_cgroups.clear();
145
146 let start = std::time::Instant::now();
147
148 let child_cgroups = self.reader.get_child_cgroups();
151 let child_cgroups_len = child_cgroups.len();
152 for child_cgroup in child_cgroups {
153 if let Some(cgroup_inode) = child_cgroup.inode() {
154 traversed_cgroups.insert(cgroup_inode);
155
156 if !self.active_cgroups.contains_key(&cgroup_inode) {
158 let container_id = child_cgroup.into_container_id();
159 debug!(%cgroup_inode, %container_id, "Found new container-based cgroup.");
160
161 self.active_cgroups.insert(cgroup_inode, container_id.clone());
162
163 let entity_id = EntityId::ContainerInode(cgroup_inode);
165 let ancestor_entity_id = EntityId::Container(container_id);
166
167 let operation = MetadataOperation::add_alias(entity_id, ancestor_entity_id);
168 self.operations.push(operation);
169 }
170 } else {
171 let container_id = child_cgroup.into_container_id();
173 warn!(%container_id, "Encountered cgroup without controller inode during metadata traversal. This is unexpected.");
174 continue;
175 }
176 }
177
178 for cgroup_inode in self.active_cgroups.keys() {
180 if !traversed_cgroups.contains(cgroup_inode) {
181 cgroups_to_delete.push(*cgroup_inode);
183 }
184 }
185
186 for cgroup_inode in cgroups_to_delete.drain(..) {
188 if let Some(container_id) = self.active_cgroups.remove(&cgroup_inode) {
189 debug!(%cgroup_inode, %container_id, "Removing old container-based cgroup.");
190
191 let entity_id = EntityId::ContainerInode(cgroup_inode);
193 let ancestor_entity_id = EntityId::Container(container_id);
194
195 let operation = MetadataOperation::remove_alias(entity_id, ancestor_entity_id);
196 self.operations.push(operation);
197 } else {
198 warn!(%cgroup_inode, "Tried to remove a cgroup that was not in the active set.");
199 }
200 }
201
202 let elapsed = start.elapsed();
203 debug!(elapsed = ?elapsed, child_cgroups_len, "Traversed cgroups.");
204
205 for operation in self.operations.drain(..) {
207 if operations_tx.blocking_send(operation).is_err() {
208 return Err(GenericError::msg("Operations channel unexpectedly closed."));
209 }
210 }
211
212 if operations_tx.is_closed() || shutdown_handle.is_triggered() {
214 return Ok(());
215 }
216
217 std::thread::sleep(Duration::from_secs(2));
218 }
219 }
220}