1use std::{
4 collections::{HashMap, VecDeque},
5 env, fs,
6 sync::atomic::{AtomicBool, Ordering::Relaxed},
7 time::Duration,
8};
9
10use bytesize::ByteSize;
11use memory_accounting::{
12 allocator::{AllocationGroupRegistry, AllocationStats, AllocationStatsSnapshot},
13 ComponentBounds, ComponentRegistry, MemoryGrant, MemoryLimiter,
14};
15use metrics::{counter, gauge, Counter, Gauge, Level};
16use saluki_common::task::spawn_traced_named;
17use saluki_config::GenericConfiguration;
18use saluki_error::{generic_error, ErrorContext as _, GenericError};
19use serde::Deserialize;
20use tokio::time::sleep;
21use tracing::{error, info, warn};
22
23const fn default_memory_slop_factor() -> f64 {
24 0.25
25}
26
27const fn default_enable_global_limiter() -> bool {
28 true
29}
30
31#[derive(Deserialize)]
33pub struct MemoryBoundsConfiguration {
34 #[serde(default)]
41 memory_limit: Option<ByteSize>,
42
43 #[serde(default = "default_memory_slop_factor")]
54 memory_slop_factor: f64,
55
56 #[serde(default = "default_enable_global_limiter")]
63 enable_global_limiter: bool,
64}
65
66impl MemoryBoundsConfiguration {
67 pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
73 let mut config = config
74 .as_typed::<Self>()
75 .error_context("Failed to parse memory bounds configuration.")?;
76
77 if config.memory_limit.is_none() {
78 if let Ok(value) = env::var("DOCKER_DD_AGENT") {
80 if !value.is_empty() {
81 let cgroup_memory_reader = CgroupMemoryParser;
82 if let Some(memory) = cgroup_memory_reader.parse() {
83 info!(
84 "Setting memory limit to {} based on detected cgroups limit.",
85 memory.display().si()
86 );
87 config.memory_limit = Some(memory);
88 }
89 }
90 }
91 }
92
93 if let Some(limit) = config.memory_limit {
95 let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
96 .error_context("Given memory limit and/or slop factor invalid.")?;
97 }
98
99 Ok(config)
100 }
101
102 pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
104 self.memory_limit.map(|limit| {
105 MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
106 .expect("memory limit should be valid")
107 })
108 }
109}
110
111pub fn initialize_memory_bounds(
123 configuration: MemoryBoundsConfiguration, component_registry: &ComponentRegistry,
124) -> Result<MemoryLimiter, GenericError> {
125 let initial_grant = match configuration.memory_limit {
126 Some(limit) => MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor)?,
127 None => {
128 info!("No memory limit set for the process. Skipping memory bounds verification.");
129 return Ok(MemoryLimiter::noop());
130 }
131 };
132
133 let verified_bounds = match component_registry.verify_bounds(initial_grant) {
134 Ok(verified_bounds) => verified_bounds,
135 Err(e) => {
136 error!("Failed to verify memory bounds: {}.", e);
137
138 let bounds = component_registry.as_bounds();
139 print_bounds(&bounds);
140
141 return Err(generic_error!(
142 "Configured memory limit is insufficient for the current configuration."
143 ));
144 }
145 };
146
147 let limiter = if configuration.enable_global_limiter {
148 MemoryLimiter::new(initial_grant)
149 .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))
150 } else {
151 Ok(MemoryLimiter::noop())
152 }?;
153
154 info!(
155 "Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
156 bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
157 bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
158 bytes_to_si_string(verified_bounds.total_available_bytes()),
159 bytes_to_si_string(initial_grant.initial_limit_bytes()),
160 );
161
162 print_bounds(verified_bounds.bounds());
163
164 Ok(limiter)
165}
166
167fn print_bounds(bounds: &ComponentBounds) {
168 info!("Breakdown of verified bounds:");
169 info!(
170 "- (root): {} minimum, {} firm",
171 bytes_to_si_string(bounds.total_minimum_required_bytes()),
172 bytes_to_si_string(bounds.total_firm_limit_bytes()),
173 );
174
175 let mut to_visit = VecDeque::new();
176 to_visit.extend(
177 bounds
178 .subcomponents()
179 .into_iter()
180 .map(|(name, bounds)| (1, name, bounds)),
181 );
182
183 while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
184 info!(
185 "{:indent$}- {}: {} minimum, {} firm",
186 "",
187 component_name,
188 bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
189 bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
190 indent = depth * 2
191 );
192
193 let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
194 while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
195 to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
196 }
197 }
198
199 info!("");
200}
201
202struct AllocationGroupMetrics {
203 totals: AllocationStatsSnapshot,
204 allocated_bytes_total: Counter,
205 allocated_bytes_live: Gauge,
206 allocated_objects_total: Counter,
207 allocated_objects_live: Gauge,
208 deallocated_bytes_total: Counter,
209 deallocated_objects_total: Counter,
210}
211
212impl AllocationGroupMetrics {
213 fn new(group_name: &str) -> Self {
214 Self {
215 totals: AllocationStatsSnapshot::empty(),
216 allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
217 allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
218 allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
219 allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
220 deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
221 deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
222 }
223 }
224
225 fn update(&mut self, stats: &AllocationStats) {
226 let delta = stats.snapshot_delta(&self.totals);
227
228 self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
229 self.allocated_objects_total.increment(delta.allocated_objects as u64);
230 self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
231 self.deallocated_objects_total
232 .increment(delta.deallocated_objects as u64);
233
234 self.totals.merge(&delta);
235 self.allocated_bytes_live
236 .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
237 self.allocated_objects_live
238 .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
239 }
240}
241
242pub(crate) async fn initialize_allocator_telemetry() -> Result<(), GenericError> {
251 static INIT: AtomicBool = AtomicBool::new(false);
253 if INIT.swap(true, Relaxed) {
254 return Err(generic_error!("Memory allocator subsystem already initialized."));
255 }
256
257 if !AllocationGroupRegistry::allocator_installed() {
261 warn!("Tracking allocator not installed. Memory telemetry will not be available.");
262 }
263
264 spawn_traced_named("allocator-telemetry-collector", async {
266 let mut metrics = HashMap::new();
267
268 loop {
269 sleep(Duration::from_secs(1)).await;
270
271 AllocationGroupRegistry::global().visit_allocation_groups(|group_name, stats| {
272 let group_metrics = match metrics.get_mut(group_name) {
273 Some(group_metrics) => group_metrics,
274 None => metrics
275 .entry(group_name.to_string())
276 .or_insert_with(|| AllocationGroupMetrics::new(group_name)),
277 };
278
279 group_metrics.update(stats);
280 });
281 }
282 });
283
284 Ok(())
285}
286
287struct CgroupMemoryParser;
288
289impl CgroupMemoryParser {
290 fn parse(self) -> Option<ByteSize> {
294 let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
295 let parts: Vec<&str> = contents.trim().split("\n").collect();
296 if parts.len() == 1 {
298 return self.parse_controller_v2(parts[0]);
299 }
300 for line in parts {
301 if line.contains(":memory:") {
302 return self.parse_controller_v1(line);
303 }
304 }
305 None
306 }
307
308 fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
309 let path = controller.split(":").nth(2)?;
310 let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
311 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
312 self.convert_to_bytesize(&raw_memory_limit)
313 }
314
315 fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
316 let path = controller.split(":").nth(2)?;
317 let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
318 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
319 self.convert_to_bytesize(&raw_memory_limit)
320 }
321
322 fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
323 let memory = s.trim().to_string();
324 if memory == "max" {
325 return None;
326 }
327 memory.parse::<ByteSize>().ok()
328 }
329}
330
331fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
332 ByteSize::b(bytes as u64).display().si()
333}