1use std::{
4 collections::{HashMap, VecDeque},
5 env, fs,
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use bytesize::ByteSize;
11use memory_accounting::{
12 allocator::{AllocationGroupRegistry, AllocationStats, AllocationStatsSnapshot},
13 ComponentBounds, ComponentRegistry, ComponentRegistryHandle, MemoryGrant, MemoryLimiter,
14};
15use metrics::{counter, gauge, Counter, Gauge, Level};
16use saluki_api::{DynamicRoute, EndpointType};
17use saluki_config::GenericConfiguration;
18use saluki_core::runtime::{
19 state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
20};
21use saluki_error::{generic_error, ErrorContext as _, GenericError};
22use serde::Deserialize;
23use tokio::{pin, select, time::sleep};
24use tracing::{error, info, warn};
25
26const fn default_memory_slop_factor() -> f64 {
27 0.25
28}
29
30const fn default_enable_global_limiter() -> bool {
31 true
32}
33
34#[derive(Deserialize)]
36pub struct MemoryBoundsConfiguration {
37 #[serde(default)]
44 memory_limit: Option<ByteSize>,
45
46 #[serde(default = "default_memory_slop_factor")]
57 memory_slop_factor: f64,
58
59 #[serde(default = "default_enable_global_limiter")]
66 enable_global_limiter: bool,
67}
68
69impl MemoryBoundsConfiguration {
70 pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
76 let mut config = config
77 .as_typed::<Self>()
78 .error_context("Failed to parse memory bounds configuration.")?;
79
80 if config.memory_limit.is_none() {
81 if let Ok(value) = env::var("DOCKER_DD_AGENT") {
83 if !value.is_empty() {
84 let cgroup_memory_reader = CgroupMemoryParser;
85 if let Some(memory) = cgroup_memory_reader.parse() {
86 info!(
87 "Setting memory limit to {} based on detected cgroups limit.",
88 memory.display().si()
89 );
90 config.memory_limit = Some(memory);
91 }
92 }
93 }
94 }
95
96 if let Some(limit) = config.memory_limit {
98 let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
99 .error_context("Given memory limit and/or slop factor invalid.")?;
100 }
101
102 Ok(config)
103 }
104
105 pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
107 self.memory_limit.map(|limit| {
108 MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
109 .expect("memory limit should be valid")
110 })
111 }
112}
113
114pub fn initialize_memory_bounds(
126 configuration: MemoryBoundsConfiguration, component_registry: ComponentRegistryHandle,
127) -> Result<MemoryLimiter, GenericError> {
128 let initial_grant = match configuration.memory_limit {
129 Some(limit) => MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor)?,
130 None => {
131 info!("No memory limit set for the process. Skipping memory bounds verification.");
132 return Ok(MemoryLimiter::noop());
133 }
134 };
135
136 let verified_bounds = match component_registry.verify_bounds(initial_grant) {
137 Ok(verified_bounds) => verified_bounds,
138 Err(e) => {
139 error!("Failed to verify memory bounds: {}.", e);
140
141 let bounds = component_registry.as_bounds();
142 print_bounds(&bounds);
143
144 return Err(generic_error!(
145 "Configured memory limit is insufficient for the current configuration."
146 ));
147 }
148 };
149
150 let limiter = if configuration.enable_global_limiter {
151 MemoryLimiter::new(initial_grant)
152 .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))
153 } else {
154 Ok(MemoryLimiter::noop())
155 }?;
156
157 info!(
158 "Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
159 bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
160 bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
161 bytes_to_si_string(verified_bounds.total_available_bytes()),
162 bytes_to_si_string(initial_grant.initial_limit_bytes()),
163 );
164
165 print_bounds(verified_bounds.bounds());
166
167 Ok(limiter)
168}
169
170fn print_bounds(bounds: &ComponentBounds) {
171 info!("Breakdown of verified bounds:");
172 info!(
173 "- (root): {} minimum, {} firm",
174 bytes_to_si_string(bounds.total_minimum_required_bytes()),
175 bytes_to_si_string(bounds.total_firm_limit_bytes()),
176 );
177
178 let mut to_visit = VecDeque::new();
179 to_visit.extend(
180 bounds
181 .subcomponents()
182 .into_iter()
183 .map(|(name, bounds)| (1, name, bounds)),
184 );
185
186 while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
187 info!(
188 "{:indent$}- {}: {} minimum, {} firm",
189 "",
190 component_name,
191 bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
192 bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
193 indent = depth * 2
194 );
195
196 let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
197 while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
198 to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
199 }
200 }
201
202 info!("");
203}
204
205struct AllocationGroupMetrics {
206 totals: AllocationStatsSnapshot,
207 allocated_bytes_total: Counter,
208 allocated_bytes_live: Gauge,
209 allocated_objects_total: Counter,
210 allocated_objects_live: Gauge,
211 deallocated_bytes_total: Counter,
212 deallocated_objects_total: Counter,
213}
214
215impl AllocationGroupMetrics {
216 fn new(group_name: &str) -> Self {
217 Self {
218 totals: AllocationStatsSnapshot::empty(),
219 allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
220 allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
221 allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
222 allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
223 deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
224 deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
225 }
226 }
227
228 fn update(&mut self, stats: &AllocationStats) {
229 let delta = stats.snapshot_delta(&self.totals);
230
231 self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
232 self.allocated_objects_total.increment(delta.allocated_objects as u64);
233 self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
234 self.deallocated_objects_total
235 .increment(delta.deallocated_objects as u64);
236
237 self.totals.merge(&delta);
238 self.allocated_bytes_live
239 .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
240 self.allocated_objects_live
241 .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
242 }
243}
244
245pub struct AllocationTelemetryWorker {
250 component_registry: ComponentRegistryHandle,
251}
252
253impl AllocationTelemetryWorker {
254 pub fn new(component_registry: &ComponentRegistry) -> Self {
256 Self {
257 component_registry: component_registry.root(),
258 }
259 }
260}
261
262#[async_trait]
263impl Supervisable for AllocationTelemetryWorker {
264 fn name(&self) -> &str {
265 "alloc-telemetry"
266 }
267
268 async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
269 if !AllocationGroupRegistry::allocator_installed() {
273 warn!("Tracking allocator not installed. Memory telemetry will not be available.");
274 }
275
276 let memory_routes = DynamicRoute::http(EndpointType::Unprivileged, self.component_registry.api_handler());
277
278 Ok(Box::pin(async move {
279 DataspaceRegistry::try_current()
281 .ok_or_else(|| generic_error!("Dataspace not available."))?
282 .assert(memory_routes, "alloc-telemetry-api");
283
284 let mut metrics = HashMap::new();
285
286 let shutdown = process_shutdown.wait_for_shutdown();
287 pin!(shutdown);
288
289 loop {
290 select! {
291 _ = &mut shutdown => break,
292 _ = sleep(Duration::from_secs(1)) => {
293 AllocationGroupRegistry::global().visit_allocation_groups(|group_name, stats| {
294 let group_metrics = match metrics.get_mut(group_name) {
295 Some(group_metrics) => group_metrics,
296 None => metrics
297 .entry(group_name.to_string())
298 .or_insert_with(|| AllocationGroupMetrics::new(group_name)),
299 };
300
301 group_metrics.update(stats);
302 });
303 }
304 }
305 }
306
307 Ok(())
308 }))
309 }
310}
311
312struct CgroupMemoryParser;
313
314impl CgroupMemoryParser {
315 fn parse(self) -> Option<ByteSize> {
319 let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
320 let parts: Vec<&str> = contents.trim().split("\n").collect();
321 if parts.len() == 1 {
323 return self.parse_controller_v2(parts[0]);
324 }
325 for line in parts {
326 if line.contains(":memory:") {
327 return self.parse_controller_v1(line);
328 }
329 }
330 None
331 }
332
333 fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
334 let path = controller.split(":").nth(2)?;
335 let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
336 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
337 self.convert_to_bytesize(&raw_memory_limit)
338 }
339
340 fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
341 let path = controller.split(":").nth(2)?;
342 let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
343 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
344 self.convert_to_bytesize(&raw_memory_limit)
345 }
346
347 fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
348 let memory = s.trim().to_string();
349 if memory == "max" {
350 return None;
351 }
352 memory.parse::<ByteSize>().ok()
353 }
354}
355
356fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
357 ByteSize::b(bytes as u64).display().si()
358}