Skip to main content

saluki_env/workload/collectors/
cgroups.rs

1use std::time::Duration;
2
3use async_trait::async_trait;
4use resource_accounting::{MemoryBounds, MemoryBoundsBuilder};
5use saluki_common::collections::{FastHashMap, FastHashSet};
6use saluki_config::GenericConfiguration;
7use saluki_core::health::Health;
8use saluki_error::{generic_error, ErrorContext as _, GenericError};
9use stringtheory::{interning::GenericMapInterner, MetaString};
10use tokio::{select, sync::mpsc};
11use tokio_util::task::AbortOnDropHandle;
12use tracing::{debug, warn};
13
14use super::MetadataCollector;
15use crate::{
16    features::FeatureDetector,
17    workload::{
18        entity::EntityId,
19        helpers::cgroups::{CgroupsConfiguration, CgroupsReader},
20        metadata::MetadataOperation,
21    },
22};
23
24/// A metadata collector that observes Linux "Control Groups" (cgroups).
25///
26/// This collector specifically tracks cgroup controllers attached to container workloads using simple regex-based
27/// matching against the controller path, and maintains a mapping of controller inodes to the container ID extracted
28/// from the controller path.
29///
30/// This is specifically used to support client-based Origin Detection in DogStatsD, where clients will either send
31/// their detected container ID _or_ the inode of their cgroup controller. A canonical container ID must always be used
32/// for origin enrichment, so this mapping allows resolving controller inodes to their canonical container ID.
33pub struct CgroupsMetadataCollector {
34    reader: CgroupsReader,
35    health: Health,
36}
37
38impl CgroupsMetadataCollector {
39    /// Creates a new `CgroupsMetadataCollector` from the given configuration.
40    ///
41    /// # Errors
42    ///
43    /// If a valid cgroups hierarchy can not be located at the configured path, an error will be returned.
44    pub async fn from_configuration(
45        config: &GenericConfiguration, feature_detector: FeatureDetector, health: Health, interner: GenericMapInterner,
46    ) -> Result<Self, GenericError> {
47        let cgroups_config = CgroupsConfiguration::from_configuration(config, feature_detector)?;
48        let reader = match CgroupsReader::try_from_config(&cgroups_config, interner)? {
49            Some(reader) => reader,
50            None => {
51                return Err(generic_error!("Failed to detect any cgroups v1/v2 hierarchy. "));
52            }
53        };
54
55        Ok(Self { reader, health })
56    }
57}
58
59#[async_trait]
60impl MetadataCollector for CgroupsMetadataCollector {
61    fn name(&self) -> &'static str {
62        "cgroups"
63    }
64
65    async fn watch(&mut self, operations_tx: &mut mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
66        self.health.mark_ready();
67
68        // Drive a blocking background task that polls the cgroups hierarchy on a regular interval, and sends metadata
69        // updates when cgroups are created or deleted. We do this in a blocking task since all of the I/O operations
70        // are synchronous.
71
72        let mut cgroups_manager = SynchronousCgroupsManager::from_reader(self.reader.clone());
73        let operations_tx = operations_tx.clone();
74
75        let raw_poller_handle = tokio::task::spawn_blocking(move || cgroups_manager.poll(operations_tx));
76        let poller_handle = AbortOnDropHandle::new(raw_poller_handle);
77        tokio::pin!(poller_handle);
78
79        debug!("Spawned cgroups background poller task.");
80
81        let final_result = loop {
82            select! {
83                _ = self.health.live() => {},
84                result = &mut poller_handle => match result {
85                    Ok(Ok(())) => break Ok(()),
86                    Ok(Err(e)) => break Err(e).error_context("Cgroups background poller task encountered an error."),
87                    Err(e) => break Err(e).error_context("Cgroups background poller task panicked."),
88                }
89            }
90        };
91
92        self.health.mark_not_ready();
93
94        final_result
95    }
96}
97
98impl MemoryBounds for CgroupsMetadataCollector {
99    fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
100        builder
101            .minimum()
102            // Pre-allocated operation batch buffer. This is only the minimum, as it could grow larger.
103            .with_array::<MetadataOperation>("metadata operations", 64);
104        // TODO: Kind of a throwaway calculation because nothing about the reader can really be bounded at the moment.
105        //
106        // Specifically, we don't know the number of cgroups that will be present... and we have both a map that holds
107        // the active cgroups _and_ a map to track the cgroups seen during a single traversal, which we need to
108        // determine which cgroups have been removed. This means we might end up with like 3 copies of the same cgroup
109        // times however many cgroups there are at peak.
110        builder.firm().with_single_value::<Self>("component struct");
111    }
112}
113
114struct SynchronousCgroupsManager {
115    reader: CgroupsReader,
116    active_cgroups: FastHashMap<u64, MetaString>,
117    operations: Vec<MetadataOperation>,
118}
119
120impl SynchronousCgroupsManager {
121    fn from_reader(reader: CgroupsReader) -> Self {
122        Self {
123            reader,
124            active_cgroups: FastHashMap::default(),
125            operations: Vec::with_capacity(64),
126        }
127    }
128
129    fn poll(&mut self, operations_tx: mpsc::Sender<MetadataOperation>) -> Result<(), GenericError> {
130        let mut traversed_cgroups = FastHashSet::default();
131        let mut cgroups_to_delete = Vec::new();
132
133        loop {
134            // Make sure we should still be running.
135            if operations_tx.is_closed() {
136                return Ok(());
137            }
138
139            traversed_cgroups.clear();
140
141            let start = std::time::Instant::now();
142
143            // Traverse the cgroups hierarchy and collect all child cgroups that we can find that are attached to a
144            // container and have a controller inode for us to attach an alias to.
145            let child_cgroups = self.reader.get_child_cgroups();
146            let child_cgroups_len = child_cgroups.len();
147            for child_cgroup in child_cgroups {
148                if let Some(cgroup_inode) = child_cgroup.inode() {
149                    traversed_cgroups.insert(cgroup_inode);
150
151                    // If we haven't seen this cgroup before, start tracking it.
152                    if !self.active_cgroups.contains_key(&cgroup_inode) {
153                        let container_id = child_cgroup.into_container_id();
154                        debug!(%cgroup_inode, %container_id, "Found new container-based cgroup.");
155
156                        self.active_cgroups.insert(cgroup_inode, container_id.clone());
157
158                        // Emit an operation to add an alias between the cgroup inode and the container ID.
159                        let entity_id = EntityId::ContainerInode(cgroup_inode);
160                        let ancestor_entity_id = EntityId::Container(container_id);
161
162                        let operation = MetadataOperation::add_alias(entity_id, ancestor_entity_id);
163                        self.operations.push(operation);
164                    }
165                } else {
166                    // If the cgroup has no inode, we can't track it.
167                    let container_id = child_cgroup.into_container_id();
168                    warn!(%container_id, "Encountered cgroup without controller inode during metadata traversal. This is unexpected.");
169                    continue;
170                }
171            }
172
173            // Figure out which cgroups are no longer active and mark them for deletion.
174            for cgroup_inode in self.active_cgroups.keys() {
175                if !traversed_cgroups.contains(cgroup_inode) {
176                    // This cgroup is no longer present, so we need to delete it.
177                    cgroups_to_delete.push(*cgroup_inode);
178                }
179            }
180
181            // Process the deletions.
182            for cgroup_inode in cgroups_to_delete.drain(..) {
183                if let Some(container_id) = self.active_cgroups.remove(&cgroup_inode) {
184                    debug!(%cgroup_inode, %container_id, "Removing old container-based cgroup.");
185
186                    // Emit a metadata operation to remove the alias between the cgroup inode and the container ID.
187                    let entity_id = EntityId::ContainerInode(cgroup_inode);
188                    let ancestor_entity_id = EntityId::Container(container_id);
189
190                    let operation = MetadataOperation::remove_alias(entity_id, ancestor_entity_id);
191                    self.operations.push(operation);
192                } else {
193                    warn!(%cgroup_inode, "Tried to remove a cgroup that was not in the active set.");
194                }
195            }
196
197            let elapsed = start.elapsed();
198            debug!(elapsed = ?elapsed, child_cgroups_len, "Traversed cgroups.");
199
200            // Send all collected operations to the channel.
201            for operation in self.operations.drain(..) {
202                if operations_tx.blocking_send(operation).is_err() {
203                    return Err(GenericError::msg("Operations channel unexpectedly closed."));
204                }
205            }
206
207            std::thread::sleep(Duration::from_secs(2));
208        }
209    }
210}