Skip to main content

saluki_app/
memory.rs

1//! Memory management.
2
3use std::{
4    collections::{HashMap, VecDeque},
5    env, fs,
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use bytesize::ByteSize;
11use memory_accounting::{
12    allocator::{AllocationGroupRegistry, AllocationStats, AllocationStatsSnapshot},
13    ComponentBounds, ComponentRegistry, ComponentRegistryHandle, MemoryGrant, MemoryLimiter,
14};
15use metrics::{counter, gauge, Counter, Gauge, Level};
16use saluki_api::{DynamicRoute, EndpointType};
17use saluki_config::GenericConfiguration;
18use saluki_core::runtime::{
19    state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
20};
21use saluki_error::{generic_error, ErrorContext as _, GenericError};
22use serde::Deserialize;
23use tokio::{pin, select, time::sleep};
24use tracing::{error, info, warn};
25
26const fn default_memory_slop_factor() -> f64 {
27    0.25
28}
29
30const fn default_enable_global_limiter() -> bool {
31    true
32}
33
34/// Configuration for memory bounds.
35#[derive(Deserialize)]
36pub struct MemoryBoundsConfiguration {
37    /// The memory limit to adhere to.
38    ///
39    /// This should be the overall memory limit for the entire process. The value can either be an integer for
40    /// specifying the limit in bytes, or a string that uses SI byte prefixes (case-insensitive) such as `1mb` or `1GB`.
41    ///
42    /// If not specified, no memory bounds verification will be performed.
43    #[serde(default)]
44    memory_limit: Option<ByteSize>,
45
46    /// The slop factor to apply to the given memory limit.
47    ///
48    /// Memory bounds are inherently fuzzy, as components are required to manually define their bounds, and as such, can
49    /// only account for memory usage that they know about. The slop factor is applied as a reduction to the overall
50    /// memory limit, such that we account for the "known unknowns" -- memory that hasn't yet been accounted for -- by
51    /// simply ensuring that we can fit within a portion of the overall limit.
52    ///
53    /// Values between 0 to 1 are allowed, and represent the percentage of `memory_limit` that is held back. This means
54    /// that a slop factor of 0.25, for example, will cause 25% of `memory_limit` to be withheld. If `memory_limit` was
55    /// 100MB, we would then verify that the memory bounds can fit within 75MB (100MB * (1 - 0.25) => 75MB).
56    #[serde(default = "default_memory_slop_factor")]
57    memory_slop_factor: f64,
58
59    /// Whether or not to enable the global memory limiter.
60    ///
61    /// When set to `false`, the global memory limiter will operate in a no-op mode. All calls to use it will never exert
62    /// backpressure, and only the inherent memory bounds of the running components will influence memory usage.
63    ///
64    /// Defaults to `true`.
65    #[serde(default = "default_enable_global_limiter")]
66    enable_global_limiter: bool,
67}
68
69impl MemoryBoundsConfiguration {
70    /// Attempts to read memory bounds configuration from the provided configuration.
71    ///
72    /// # Errors
73    ///
74    /// If an error occurs during deserialization, an error will be returned.
75    pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
76        let mut config = config
77            .as_typed::<Self>()
78            .error_context("Failed to parse memory bounds configuration.")?;
79
80        if config.memory_limit.is_none() {
81            // Try to pull configured memory limit from Cgroup if running in a containerized environment.
82            if let Ok(value) = env::var("DOCKER_DD_AGENT") {
83                if !value.is_empty() {
84                    let cgroup_memory_reader = CgroupMemoryParser;
85                    if let Some(memory) = cgroup_memory_reader.parse() {
86                        info!(
87                            "Setting memory limit to {} based on detected cgroups limit.",
88                            memory.display().si()
89                        );
90                        config.memory_limit = Some(memory);
91                    }
92                }
93            }
94        }
95
96        // Try constructing the initial grant based on the configuration as a smoke test to validate the values.
97        if let Some(limit) = config.memory_limit {
98            let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
99                .error_context("Given memory limit and/or slop factor invalid.")?;
100        }
101
102        Ok(config)
103    }
104
105    /// Gets the initial memory grant based on the configuration.
106    pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
107        self.memory_limit.map(|limit| {
108            MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
109                .expect("memory limit should be valid")
110        })
111    }
112}
113
114/// Initializes the memory bounds system and verifies any configured bounds.
115///
116/// If no memory limit is configured, or if the populated memory bounds fit within the configured memory limit,
117/// `Ok(MemoryLimiter)` is returned. The memory limiter can be used as a global limiter for the process, allowing
118/// callers to cooperatively participate in staying within the configured memory bounds by blocking when used memory
119/// exceeds the configured limit, until it returns below the limit. The limiter uses the effective memory limit, based
120/// on the configured slop factor.
121///
122/// # Errors
123///
124/// If the bounds could not be validated, an error is returned.
125pub fn initialize_memory_bounds(
126    configuration: MemoryBoundsConfiguration, component_registry: ComponentRegistryHandle,
127) -> Result<MemoryLimiter, GenericError> {
128    let initial_grant = match configuration.memory_limit {
129        Some(limit) => MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor)?,
130        None => {
131            info!("No memory limit set for the process. Skipping memory bounds verification.");
132            return Ok(MemoryLimiter::noop());
133        }
134    };
135
136    let verified_bounds = match component_registry.verify_bounds(initial_grant) {
137        Ok(verified_bounds) => verified_bounds,
138        Err(e) => {
139            error!("Failed to verify memory bounds: {}.", e);
140
141            let bounds = component_registry.as_bounds();
142            print_bounds(&bounds);
143
144            return Err(generic_error!(
145                "Configured memory limit is insufficient for the current configuration."
146            ));
147        }
148    };
149
150    let limiter = if configuration.enable_global_limiter {
151        MemoryLimiter::new(initial_grant)
152            .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))
153    } else {
154        Ok(MemoryLimiter::noop())
155    }?;
156
157    info!(
158		"Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
159		bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
160		bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
161		bytes_to_si_string(verified_bounds.total_available_bytes()),
162		bytes_to_si_string(initial_grant.initial_limit_bytes()),
163	);
164
165    print_bounds(verified_bounds.bounds());
166
167    Ok(limiter)
168}
169
170fn print_bounds(bounds: &ComponentBounds) {
171    info!("Breakdown of verified bounds:");
172    info!(
173        "- (root): {} minimum, {} firm",
174        bytes_to_si_string(bounds.total_minimum_required_bytes()),
175        bytes_to_si_string(bounds.total_firm_limit_bytes()),
176    );
177
178    let mut to_visit = VecDeque::new();
179    to_visit.extend(
180        bounds
181            .subcomponents()
182            .into_iter()
183            .map(|(name, bounds)| (1, name, bounds)),
184    );
185
186    while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
187        info!(
188            "{:indent$}- {}: {} minimum, {} firm",
189            "",
190            component_name,
191            bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
192            bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
193            indent = depth * 2
194        );
195
196        let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
197        while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
198            to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
199        }
200    }
201
202    info!("");
203}
204
205struct AllocationGroupMetrics {
206    totals: AllocationStatsSnapshot,
207    allocated_bytes_total: Counter,
208    allocated_bytes_live: Gauge,
209    allocated_objects_total: Counter,
210    allocated_objects_live: Gauge,
211    deallocated_bytes_total: Counter,
212    deallocated_objects_total: Counter,
213}
214
215impl AllocationGroupMetrics {
216    fn new(group_name: &str) -> Self {
217        Self {
218            totals: AllocationStatsSnapshot::empty(),
219            allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
220            allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
221            allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
222            allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
223            deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
224            deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
225        }
226    }
227
228    fn update(&mut self, stats: &AllocationStats) {
229        let delta = stats.snapshot_delta(&self.totals);
230
231        self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
232        self.allocated_objects_total.increment(delta.allocated_objects as u64);
233        self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
234        self.deallocated_objects_total
235            .increment(delta.deallocated_objects as u64);
236
237        self.totals.merge(&delta);
238        self.allocated_bytes_live
239            .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
240        self.allocated_objects_live
241            .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
242    }
243}
244
245/// A worker that periodically collects per-allocation group memory usage statistics and emits the internal telemetry.
246///
247/// Additionally, asserts the memory API routes from the given [`ComponentRegistry`] as a [`DynamicRoute`] on the
248/// unprivileged API endpoint.
249pub struct AllocationTelemetryWorker {
250    component_registry: ComponentRegistryHandle,
251}
252
253impl AllocationTelemetryWorker {
254    /// Creates a new `AllocationTelemetryWorker` for the given component registry.
255    pub fn new(component_registry: &ComponentRegistry) -> Self {
256        Self {
257            component_registry: component_registry.root(),
258        }
259    }
260}
261
262#[async_trait]
263impl Supervisable for AllocationTelemetryWorker {
264    fn name(&self) -> &str {
265        "alloc-telemetry"
266    }
267
268    async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
269        // We can't enforce, at compile-time, that the tracking allocator must be installed if a caller is trying to
270        // initialize the allocator's reporting infrastructure... but we can at least warn them if we detect it's not
271        // installed here at runtime.
272        if !AllocationGroupRegistry::allocator_installed() {
273            warn!("Tracking allocator not installed. Memory telemetry will not be available.");
274        }
275
276        let memory_routes = DynamicRoute::http(EndpointType::Unprivileged, self.component_registry.api_handler());
277
278        Ok(Box::pin(async move {
279            // Register our API routes before we actually start running.
280            DataspaceRegistry::try_current()
281                .ok_or_else(|| generic_error!("Dataspace not available."))?
282                .assert(memory_routes, "alloc-telemetry-api");
283
284            let mut metrics = HashMap::new();
285
286            let shutdown = process_shutdown.wait_for_shutdown();
287            pin!(shutdown);
288
289            loop {
290                select! {
291                    _ = &mut shutdown => break,
292                    _ = sleep(Duration::from_secs(1)) => {
293                        AllocationGroupRegistry::global().visit_allocation_groups(|group_name, stats| {
294                            let group_metrics = match metrics.get_mut(group_name) {
295                                Some(group_metrics) => group_metrics,
296                                None => metrics
297                                    .entry(group_name.to_string())
298                                    .or_insert_with(|| AllocationGroupMetrics::new(group_name)),
299                            };
300
301                            group_metrics.update(stats);
302                        });
303                    }
304                }
305            }
306
307            Ok(())
308        }))
309    }
310}
311
312struct CgroupMemoryParser;
313
314impl CgroupMemoryParser {
315    /// Parse memory limit from memory controller.
316    ///
317    /// Returns `None` if memory limit is set to max or if an error is encountered while parsing.
318    fn parse(self) -> Option<ByteSize> {
319        let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
320        let parts: Vec<&str> = contents.trim().split("\n").collect();
321        // CgroupV2 has unified controllers.
322        if parts.len() == 1 {
323            return self.parse_controller_v2(parts[0]);
324        }
325        for line in parts {
326            if line.contains(":memory:") {
327                return self.parse_controller_v1(line);
328            }
329        }
330        None
331    }
332
333    fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
334        let path = controller.split(":").nth(2)?;
335        let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
336        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
337        self.convert_to_bytesize(&raw_memory_limit)
338    }
339
340    fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
341        let path = controller.split(":").nth(2)?;
342        let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
343        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
344        self.convert_to_bytesize(&raw_memory_limit)
345    }
346
347    fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
348        let memory = s.trim().to_string();
349        if memory == "max" {
350            return None;
351        }
352        memory.parse::<ByteSize>().ok()
353    }
354}
355
356fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
357    ByteSize::b(bytes as u64).display().si()
358}