saluki_app/
memory.rs

1//! Memory management.
2
3use std::{
4    collections::{HashMap, VecDeque},
5    env, fs,
6    sync::atomic::{AtomicBool, Ordering::Relaxed},
7    time::Duration,
8};
9
10use bytesize::ByteSize;
11use memory_accounting::{
12    allocator::{AllocationGroupRegistry, AllocationStats, AllocationStatsSnapshot},
13    ComponentBounds, ComponentRegistry, MemoryGrant, MemoryLimiter,
14};
15use metrics::{counter, gauge, Counter, Gauge, Level};
16use saluki_common::task::spawn_traced_named;
17use saluki_config::GenericConfiguration;
18use saluki_error::{generic_error, ErrorContext as _, GenericError};
19use serde::Deserialize;
20use tokio::time::sleep;
21use tracing::{error, info, warn};
22
23const fn default_memory_slop_factor() -> f64 {
24    0.25
25}
26
27const fn default_enable_global_limiter() -> bool {
28    true
29}
30
31/// Configuration for memory bounds.
32#[derive(Deserialize)]
33pub struct MemoryBoundsConfiguration {
34    /// The memory limit to adhere to.
35    ///
36    /// This should be the overall memory limit for the entire process. The value can either be an integer for
37    /// specifying the limit in bytes, or a string that uses SI byte prefixes (case-insensitive) such as `1mb` or `1GB`.
38    ///
39    /// If not specified, no memory bounds verification will be performed.
40    #[serde(default)]
41    memory_limit: Option<ByteSize>,
42
43    /// The slop factor to apply to the given memory limit.
44    ///
45    /// Memory bounds are inherently fuzzy, as components are required to manually define their bounds, and as such, can
46    /// only account for memory usage that they know about. The slop factor is applied as a reduction to the overall
47    /// memory limit, such that we account for the "known unknowns" -- memory that hasn't yet been accounted for -- by
48    /// simply ensuring that we can fit within a portion of the overall limit.
49    ///
50    /// Values between 0 to 1 are allowed, and represent the percentage of `memory_limit` that is held back. This means
51    /// that a slop factor of 0.25, for example, will cause 25% of `memory_limit` to be withheld. If `memory_limit` was
52    /// 100MB, we would then verify that the memory bounds can fit within 75MB (100MB * (1 - 0.25) => 75MB).
53    #[serde(default = "default_memory_slop_factor")]
54    memory_slop_factor: f64,
55
56    /// Whether or not to enable the global memory limiter.
57    ///
58    /// When set to `false`, the global memory limiter will operate in a no-op mode. All calls to use it will never exert
59    /// backpressure, and only the inherent memory bounds of the running components will influence memory usage.
60    ///
61    /// Defaults to `true`.
62    #[serde(default = "default_enable_global_limiter")]
63    enable_global_limiter: bool,
64}
65
66impl MemoryBoundsConfiguration {
67    /// Attempts to read memory bounds configuration from the provided configuration.
68    ///
69    /// # Errors
70    ///
71    /// If an error occurs during deserialization, an error will be returned.
72    pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
73        let mut config = config
74            .as_typed::<Self>()
75            .error_context("Failed to parse memory bounds configuration.")?;
76
77        if config.memory_limit.is_none() {
78            // Try to pull configured memory limit from Cgroup if running in a containerized environment.
79            if let Ok(value) = env::var("DOCKER_DD_AGENT") {
80                if !value.is_empty() {
81                    let cgroup_memory_reader = CgroupMemoryParser;
82                    if let Some(memory) = cgroup_memory_reader.parse() {
83                        info!(
84                            "Setting memory limit to {} based on detected cgroups limit.",
85                            memory.display().si()
86                        );
87                        config.memory_limit = Some(memory);
88                    }
89                }
90            }
91        }
92
93        // Try constructing the initial grant based on the configuration as a smoke test to validate the values.
94        if let Some(limit) = config.memory_limit {
95            let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
96                .error_context("Given memory limit and/or slop factor invalid.")?;
97        }
98
99        Ok(config)
100    }
101
102    /// Gets the initial memory grant based on the configuration.
103    pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
104        self.memory_limit.map(|limit| {
105            MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
106                .expect("memory limit should be valid")
107        })
108    }
109}
110
111/// Initializes the memory bounds system and verifies any configured bounds.
112///
113/// If no memory limit is configured, or if the populated memory bounds fit within the configured memory limit,
114/// `Ok(MemoryLimiter)` is returned. The memory limiter can be used as a global limiter for the process, allowing
115/// callers to cooperatively participate in staying within the configured memory bounds by blocking when used memory
116/// exceeds the configured limit, until it returns below the limit. The limiter uses the effective memory limit, based
117/// on the configured slop factor.
118///
119/// # Errors
120///
121/// If the bounds could not be validated, an error is returned.
122pub fn initialize_memory_bounds(
123    configuration: MemoryBoundsConfiguration, component_registry: &ComponentRegistry,
124) -> Result<MemoryLimiter, GenericError> {
125    let initial_grant = match configuration.memory_limit {
126        Some(limit) => MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor)?,
127        None => {
128            info!("No memory limit set for the process. Skipping memory bounds verification.");
129            return Ok(MemoryLimiter::noop());
130        }
131    };
132
133    let verified_bounds = match component_registry.verify_bounds(initial_grant) {
134        Ok(verified_bounds) => verified_bounds,
135        Err(e) => {
136            error!("Failed to verify memory bounds: {}.", e);
137
138            let bounds = component_registry.as_bounds();
139            print_bounds(&bounds);
140
141            return Err(generic_error!(
142                "Configured memory limit is insufficient for the current configuration."
143            ));
144        }
145    };
146
147    let limiter = if configuration.enable_global_limiter {
148        MemoryLimiter::new(initial_grant)
149            .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))
150    } else {
151        Ok(MemoryLimiter::noop())
152    }?;
153
154    info!(
155		"Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
156		bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
157		bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
158		bytes_to_si_string(verified_bounds.total_available_bytes()),
159		bytes_to_si_string(initial_grant.initial_limit_bytes()),
160	);
161
162    print_bounds(verified_bounds.bounds());
163
164    Ok(limiter)
165}
166
167fn print_bounds(bounds: &ComponentBounds) {
168    info!("Breakdown of verified bounds:");
169    info!(
170        "- (root): {} minimum, {} firm",
171        bytes_to_si_string(bounds.total_minimum_required_bytes()),
172        bytes_to_si_string(bounds.total_firm_limit_bytes()),
173    );
174
175    let mut to_visit = VecDeque::new();
176    to_visit.extend(
177        bounds
178            .subcomponents()
179            .into_iter()
180            .map(|(name, bounds)| (1, name, bounds)),
181    );
182
183    while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
184        info!(
185            "{:indent$}- {}: {} minimum, {} firm",
186            "",
187            component_name,
188            bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
189            bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
190            indent = depth * 2
191        );
192
193        let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
194        while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
195            to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
196        }
197    }
198
199    info!("");
200}
201
202struct AllocationGroupMetrics {
203    totals: AllocationStatsSnapshot,
204    allocated_bytes_total: Counter,
205    allocated_bytes_live: Gauge,
206    allocated_objects_total: Counter,
207    allocated_objects_live: Gauge,
208    deallocated_bytes_total: Counter,
209    deallocated_objects_total: Counter,
210}
211
212impl AllocationGroupMetrics {
213    fn new(group_name: &str) -> Self {
214        Self {
215            totals: AllocationStatsSnapshot::empty(),
216            allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
217            allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
218            allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
219            allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
220            deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
221            deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
222        }
223    }
224
225    fn update(&mut self, stats: &AllocationStats) {
226        let delta = stats.snapshot_delta(&self.totals);
227
228        self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
229        self.allocated_objects_total.increment(delta.allocated_objects as u64);
230        self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
231        self.deallocated_objects_total
232            .increment(delta.deallocated_objects as u64);
233
234        self.totals.merge(&delta);
235        self.allocated_bytes_live
236            .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
237        self.allocated_objects_live
238            .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
239    }
240}
241
242/// Initializes the memory allocator telemetry subsystem.
243///
244/// This spawns a background task that will periodically collect memory usage statistics, such as which components are
245/// responsible for which portion of the live heap, and report them as internal telemetry.
246///
247/// # Errors
248///
249/// If the memory allocator subsystem has already been initialized, an error will be returned.
250pub(crate) async fn initialize_allocator_telemetry() -> Result<(), GenericError> {
251    // Simple initialization guard to prevent multiple calls to this function.
252    static INIT: AtomicBool = AtomicBool::new(false);
253    if INIT.swap(true, Relaxed) {
254        return Err(generic_error!("Memory allocator subsystem already initialized."));
255    }
256
257    // We can't enforce, at compile-time, that the tracking allocator must be installed if a caller is trying to
258    // initialize the allocator's reporting infrastructure... but we can at least warn them if we detect it's not
259    // installed here at runtime.
260    if !AllocationGroupRegistry::allocator_installed() {
261        warn!("Tracking allocator not installed. Memory telemetry will not be available.");
262    }
263
264    // Spawn the background task that will periodically collect memory usage statistics.
265    spawn_traced_named("allocator-telemetry-collector", async {
266        let mut metrics = HashMap::new();
267
268        loop {
269            sleep(Duration::from_secs(1)).await;
270
271            AllocationGroupRegistry::global().visit_allocation_groups(|group_name, stats| {
272                let group_metrics = match metrics.get_mut(group_name) {
273                    Some(group_metrics) => group_metrics,
274                    None => metrics
275                        .entry(group_name.to_string())
276                        .or_insert_with(|| AllocationGroupMetrics::new(group_name)),
277                };
278
279                group_metrics.update(stats);
280            });
281        }
282    });
283
284    Ok(())
285}
286
287struct CgroupMemoryParser;
288
289impl CgroupMemoryParser {
290    /// Parse memory limit from memory controller.
291    ///
292    /// Returns `None` if memory limit is set to max or if an error is encountered while parsing.
293    fn parse(self) -> Option<ByteSize> {
294        let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
295        let parts: Vec<&str> = contents.trim().split("\n").collect();
296        // CgroupV2 has unified controllers.
297        if parts.len() == 1 {
298            return self.parse_controller_v2(parts[0]);
299        }
300        for line in parts {
301            if line.contains(":memory:") {
302                return self.parse_controller_v1(line);
303            }
304        }
305        None
306    }
307
308    fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
309        let path = controller.split(":").nth(2)?;
310        let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
311        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
312        self.convert_to_bytesize(&raw_memory_limit)
313    }
314
315    fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
316        let path = controller.split(":").nth(2)?;
317        let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
318        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
319        self.convert_to_bytesize(&raw_memory_limit)
320    }
321
322    fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
323        let memory = s.trim().to_string();
324        if memory == "max" {
325            return None;
326        }
327        memory.parse::<ByteSize>().ok()
328    }
329}
330
331fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
332    ByteSize::b(bytes as u64).display().si()
333}