saluki_env/workload/collectors/
cgroups.rs1use std::time::Duration;
2
3use async_trait::async_trait;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::collections::{FastHashMap, FastHashSet};
6use saluki_config::GenericConfiguration;
7use saluki_core::health::Health;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use stringtheory::{interning::GenericMapInterner, MetaString};
10use tokio::{select, sync::mpsc};
11use tokio_util::task::AbortOnDropHandle;
12use tracing::{debug, warn};
13
14use super::MetadataCollector;
15use crate::{
16 features::FeatureDetector,
17 workload::{
18 entity::EntityId,
19 helpers::cgroups::{CgroupsConfiguration, CgroupsReader},
20 metadata::MetadataOperation,
21 },
22};
23
24pub struct CgroupsMetadataCollector {
34 reader: CgroupsReader,
35 health: Health,
36}
37
38impl CgroupsMetadataCollector {
39 pub async fn from_configuration(
45 config: &GenericConfiguration, feature_detector: FeatureDetector, health: Health, interner: GenericMapInterner,
46 ) -> Result<Self, GenericError> {
47 let cgroups_config = CgroupsConfiguration::from_configuration(config, feature_detector)?;
48 let reader = match CgroupsReader::try_from_config(&cgroups_config, interner)? {
49 Some(reader) => reader,
50 None => {
51 return Err(generic_error!("Failed to detect any cgroups v1/v2 hierarchy. "));
52 }
53 };
54
55 Ok(Self { reader, health })
56 }
57}
58
59#[async_trait]
60impl MetadataCollector for CgroupsMetadataCollector {
61 fn name(&self) -> &'static str {
62 "cgroups"
63 }
64
65 async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
66 self.health.mark_ready();
67
68 let mut cgroups_manager = SynchronousCgroupsManager::from_reader(self.reader.clone());
73 let operations_tx = operations_tx.clone();
74
75 let raw_poller_handle = tokio::task::spawn_blocking(move || cgroups_manager.poll(operations_tx));
76 let poller_handle = AbortOnDropHandle::new(raw_poller_handle);
77 tokio::pin!(poller_handle);
78
79 debug!("Spawned cgroups background poller task.");
80
81 let final_result = loop {
82 select! {
83 _ = self.health.live() => {},
84 result = &mut poller_handle => match result {
85 Ok(Ok(())) => break Ok(()),
86 Ok(Err(e)) => break Err(e).error_context("Cgroups background poller task encountered an error."),
87 Err(e) => break Err(e).error_context("Cgroups background poller task panicked."),
88 }
89 }
90 };
91
92 self.health.mark_not_ready();
93
94 final_result
95 }
96}
97
98impl MemoryBounds for CgroupsMetadataCollector {
99 fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
100 builder
101 .minimum()
102 .with_array::<MetadataOperation>("metadata operations", 64);
104 builder.firm().with_single_value::<Self>("component struct");
111 }
112}
113
114struct SynchronousCgroupsManager {
115 reader: CgroupsReader,
116 active_cgroups: FastHashMap<u64, MetaString>,
117 operations: Vec<MetadataOperation>,
118}
119
120impl SynchronousCgroupsManager {
121 fn from_reader(reader: CgroupsReader) -> Self {
122 Self {
123 reader,
124 active_cgroups: FastHashMap::default(),
125 operations: Vec::with_capacity(64),
126 }
127 }
128
129 fn poll(&mut self, operations_tx: mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
130 let mut traversed_cgroups = FastHashSet::default();
131 let mut cgroups_to_delete = Vec::new();
132
133 loop {
134 if operations_tx.is_closed() {
136 return Ok(());
137 }
138
139 traversed_cgroups.clear();
140
141 let start = std::time::Instant::now();
142
143 let child_cgroups = self.reader.get_child_cgroups();
146 let child_cgroups_len = child_cgroups.len();
147 for child_cgroup in child_cgroups {
148 if let Some(cgroup_inode) = child_cgroup.inode() {
149 traversed_cgroups.insert(cgroup_inode);
150
151 if !self.active_cgroups.contains_key(&cgroup_inode) {
153 let container_id = child_cgroup.into_container_id();
154 debug!(%cgroup_inode, %container_id, "Found new container-based cgroup.");
155
156 self.active_cgroups.insert(cgroup_inode, container_id.clone());
157
158 let entity_id = EntityId::ContainerInode(cgroup_inode);
160 let ancestor_entity_id = EntityId::Container(container_id);
161
162 let operation = MetadataOperation::add_alias(entity_id, ancestor_entity_id);
163 self.operations.push(operation);
164 }
165 } else {
166 let container_id = child_cgroup.into_container_id();
168 warn!(%container_id, "Encountered cgroup without controller inode during metadata traversal. This is unexpected.");
169 continue;
170 }
171 }
172
173 for cgroup_inode in self.active_cgroups.keys() {
175 if !traversed_cgroups.contains(cgroup_inode) {
176 cgroups_to_delete.push(*cgroup_inode);
178 }
179 }
180
181 for cgroup_inode in cgroups_to_delete.drain(..) {
183 if let Some(container_id) = self.active_cgroups.remove(&cgroup_inode) {
184 debug!(%cgroup_inode, %container_id, "Removing old container-based cgroup.");
185
186 let entity_id = EntityId::ContainerInode(cgroup_inode);
188 let ancestor_entity_id = EntityId::Container(container_id);
189
190 let operation = MetadataOperation::remove_alias(entity_id, ancestor_entity_id);
191 self.operations.push(operation);
192 } else {
193 warn!(%cgroup_inode, "Tried to remove a cgroup that was not in the active set.");
194 }
195 }
196
197 let elapsed = start.elapsed();
198 debug!(elapsed = ?elapsed, child_cgroups_len, "Traversed cgroups.");
199
200 for operation in self.operations.drain(..) {
202 if operations_tx.blocking_send(operation).is_err() {
203 return Err(GenericError::msg("Operations channel unexpectedly closed."));
204 }
205 }
206
207 std::thread::sleep(Duration::from_secs(2));
208 }
209 }
210}