Skip to main content

saluki_env/workload/collectors/
cgroups.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::{
6    collections::{FastHashMap, FastHashSet},
7    sync::shutdown::ShutdownHandle,
8};
9use saluki_config::GenericConfiguration;
10use saluki_core::health::Health;
11use saluki_error::{generic_error, ErrorContext as _, GenericError};
12use stringtheory::{interning::GenericMapInterner, MetaString};
13use tokio::{pin, select, sync::mpsc};
14use tracing::{debug, warn};
15
16use super::MetadataCollector;
17use crate::{
18    features::FeatureDetector,
19    workload::{
20        entity::EntityId,
21        helpers::cgroups::{CgroupsConfiguration, CgroupsReader},
22        metadata::MetadataOperation,
23    },
24};
25
26/// A metadata collector that observes Linux "Control Groups" (cgroups).
27///
28/// This collector specifically tracks cgroup controllers attached to container workloads using simple regex-based
29/// matching against the controller path, and maintains a mapping of controller inodes to the container ID extracted
30/// from the controller path.
31///
32/// This is specifically used to support client-based Origin Detection in DogStatsD, where clients will either send
33/// their detected container ID _or_ the inode of their cgroup controller. A canonical container ID must always be used
34/// for origin enrichment, so this mapping allows resolving controller inodes to their canonical container ID.
35pub struct CgroupsMetadataCollector {
36    reader: CgroupsReader,
37    health: Health,
38}
39
40impl CgroupsMetadataCollector {
41    /// Creates a new `CgroupsMetadataCollector` from the given configuration.
42    ///
43    /// # Errors
44    ///
45    /// If a valid cgroups hierarchy can not be located at the configured path, an error will be returned.
46    pub async fn from_configuration(
47        config: &GenericConfiguration, feature_detector: FeatureDetector, health: Health, interner: GenericMapInterner,
48    ) -> Result<Self, GenericError> {
49        let cgroups_config = CgroupsConfiguration::from_configuration(config, feature_detector)?;
50        let reader = match CgroupsReader::try_from_config(&cgroups_config, interner)? {
51            Some(reader) => reader,
52            None => {
53                return Err(generic_error!("Failed to detect any cgroups v1/v2 hierarchy. "));
54            }
55        };
56
57        Ok(Self { reader, health })
58    }
59}
60
61#[async_trait]
62impl MetadataCollector for CgroupsMetadataCollector {
63    fn name(&self) -> &'static str {
64        "cgroups"
65    }
66
67    async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
68        self.health.mark_ready();
69
70        // Drive a blocking background task that polls the cgroups hierarchy on a regular interval, and sends metadata
71        // updates when cgroups are created or deleted. We do this in a blocking task since all of the I/O operations
72        // are synchronous.
73        let mut cgroups_manager = SynchronousCgroupsManager::from_reader(self.reader.clone());
74        let operations_tx = operations_tx.clone();
75
76        // We hold on to the shutdown coordinator here (even though we never call it) so that it only triggers on drop,
77        // ensuring we don't leak the blocking poller task if this collector is dropped before it completes.
78        let (_shutdown_coordinator, shutdown_handle) = ShutdownHandle::paired();
79        let poller_handle = tokio::task::spawn_blocking(move || cgroups_manager.poll(operations_tx, shutdown_handle));
80        pin!(poller_handle);
81
82        debug!("Spawned cgroups background poller task.");
83
84        let final_result = loop {
85            select! {
86                _ = self.health.live() => {},
87                result = &mut poller_handle => match result {
88                    Ok(Ok(())) => break Ok(()),
89                    Ok(Err(e)) => break Err(e).error_context("Cgroups background poller task encountered an error."),
90                    Err(e) => break Err(e).error_context("Cgroups background poller task panicked."),
91                }
92            }
93        };
94
95        self.health.mark_not_ready();
96
97        final_result
98    }
99}
100
101impl MemoryBounds for CgroupsMetadataCollector {
102    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
103        builder
104            .minimum()
105            // Pre-allocated operation batch buffer. This is only the minimum, as it could grow larger.
106            .with_array::<MetadataOperation>("metadata operations", 64);
107        // TODO: Kind of a throwaway calculation because nothing about the reader can really be bounded at the moment.
108        //
109        // Specifically, we don't know the number of cgroups that will be present... and we have both a map that holds
110        // the active cgroups _and_ a map to track the cgroups seen during a single traversal, which we need to
111        // determine which cgroups have been removed. This means we might end up with like 3 copies of the same cgroup
112        // times however many cgroups there are at peak.
113        builder.firm().with_single_value::<Self>("component struct");
114    }
115}
116
117struct SynchronousCgroupsManager {
118    reader: CgroupsReader,
119    active_cgroups: FastHashMap<u64, MetaString>,
120    operations: Vec<MetadataOperation>,
121}
122
123impl SynchronousCgroupsManager {
124    fn from_reader(reader: CgroupsReader) -> Self {
125        Self {
126            reader,
127            active_cgroups: FastHashMap::default(),
128            operations: Vec::with_capacity(64),
129        }
130    }
131
132    fn poll(
133        &mut self, operations_tx: mpsc::Sender<MetadataOperation>, shutdown_handle: ShutdownHandle,
134    ) -> Result<(), GenericError> {
135        let mut traversed_cgroups = FastHashSet::default();
136        let mut cgroups_to_delete = Vec::new();
137
138        loop {
139            // Make sure we should still be running.
140            if operations_tx.is_closed() || shutdown_handle.is_triggered() {
141                return Ok(());
142            }
143
144            traversed_cgroups.clear();
145
146            let start = std::time::Instant::now();
147
148            // Traverse the cgroups hierarchy and collect all child cgroups that we can find that are attached to a
149            // container and have a controller inode for us to attach an alias to.
150            let child_cgroups = self.reader.get_child_cgroups();
151            let child_cgroups_len = child_cgroups.len();
152            for child_cgroup in child_cgroups {
153                if let Some(cgroup_inode) = child_cgroup.inode() {
154                    traversed_cgroups.insert(cgroup_inode);
155
156                    // If we haven't seen this cgroup before, start tracking it.
157                    if !self.active_cgroups.contains_key(&cgroup_inode) {
158                        let container_id = child_cgroup.into_container_id();
159                        debug!(%cgroup_inode, %container_id, "Found new container-based cgroup.");
160
161                        self.active_cgroups.insert(cgroup_inode, container_id.clone());
162
163                        // Emit an operation to add an alias between the cgroup inode and the container ID.
164                        let entity_id = EntityId::ContainerInode(cgroup_inode);
165                        let ancestor_entity_id = EntityId::Container(container_id);
166
167                        let operation = MetadataOperation::add_alias(entity_id, ancestor_entity_id);
168                        self.operations.push(operation);
169                    }
170                } else {
171                    // If the cgroup has no inode, we can't track it.
172                    let container_id = child_cgroup.into_container_id();
173                    warn!(%container_id, "Encountered cgroup without controller inode during metadata traversal. This is unexpected.");
174                    continue;
175                }
176            }
177
178            // Figure out which cgroups are no longer active and mark them for deletion.
179            for cgroup_inode in self.active_cgroups.keys() {
180                if !traversed_cgroups.contains(cgroup_inode) {
181                    // This cgroup is no longer present, so we need to delete it.
182                    cgroups_to_delete.push(*cgroup_inode);
183                }
184            }
185
186            // Process the deletions.
187            for cgroup_inode in cgroups_to_delete.drain(..) {
188                if let Some(container_id) = self.active_cgroups.remove(&cgroup_inode) {
189                    debug!(%cgroup_inode, %container_id, "Removing old container-based cgroup.");
190
191                    // Emit a metadata operation to remove the alias between the cgroup inode and the container ID.
192                    let entity_id = EntityId::ContainerInode(cgroup_inode);
193                    let ancestor_entity_id = EntityId::Container(container_id);
194
195                    let operation = MetadataOperation::remove_alias(entity_id, ancestor_entity_id);
196                    self.operations.push(operation);
197                } else {
198                    warn!(%cgroup_inode, "Tried to remove a cgroup that was not in the active set.");
199                }
200            }
201
202            let elapsed = start.elapsed();
203            debug!(elapsed = ?elapsed, child_cgroups_len, "Traversed cgroups.");
204
205            // Send all collected operations to the channel.
206            for operation in self.operations.drain(..) {
207                if operations_tx.blocking_send(operation).is_err() {
208                    return Err(GenericError::msg("Operations channel unexpectedly closed."));
209                }
210            }
211
212            // Check again if we should shutdown before we go to sleep.
213            if operations_tx.is_closed() || shutdown_handle.is_triggered() {
214                return Ok(());
215            }
216
217            std::thread::sleep(Duration::from_secs(2));
218        }
219    }
220}