Skip to main content

saluki_app/
accounting.rs

1//! Resource accounting and telemetry.
2
3use std::{
4    collections::{HashMap, VecDeque},
5    env, fs,
6    time::Duration,
7};
8
9use bytesize::ByteSize;
10use metrics::{counter, gauge, Counter, Gauge, Level};
11use resource_accounting::{
12    ComponentBounds, ComponentRegistry, ComponentRegistryHandle, MemoryGrant, MemoryLimiter, ResourceGroupRegistry,
13    ResourceStats, ResourceStatsSnapshot,
14};
15use saluki_api::{DynamicRoute, EndpointType};
16use saluki_config::GenericConfiguration;
17use saluki_core::runtime::{
18    state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
19};
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use serde::Deserialize;
22use tokio::{pin, select, time::sleep};
23use tonic::async_trait;
24use tracing::{error, info, warn};
25
26const fn default_memory_slop_factor() -> f64 {
27    0.25
28}
29
30const fn default_enable_global_limiter() -> bool {
31    true
32}
33
34/// Bounds validation and global memory limiter behavior.
35#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
36#[serde(rename_all = "lowercase")]
37pub enum MemoryMode {
38    /// Bounds validation is skipped, and no memory limiting is applied.
39    #[default]
40    Disabled,
41
42    /// Treat bounds validation failures as non-fatal.
43    ///
44    /// Global memory limiter will be enabled and active if a memory limit is configured.
45    Permissive,
46
47    /// Treat bounds validation failures as fatal.
48    ///
49    /// Global memory limiter will be enabled and active if a memory limit is configured.
50    Strict,
51}
52
53/// Configuration for memory bounds.
54#[derive(Deserialize)]
55pub struct MemoryBoundsConfiguration {
56    /// The memory limit to adhere to.
57    ///
58    /// This should be the overall memory limit for the entire process. The value can either be an integer for
59    /// specifying the limit in bytes, or a string that uses SI byte prefixes (case-insensitive) such as `1mb` or `1GB`.
60    ///
61    /// If not specified, no memory bounds verification will be performed.
62    #[serde(default)]
63    memory_limit: Option<ByteSize>,
64
65    /// The slop factor to apply to the given memory limit.
66    ///
67    /// Memory bounds are inherently fuzzy, as components are required to manually define their bounds, and as such, can
68    /// only account for memory usage that they know about. The slop factor is applied as a reduction to the overall
69    /// memory limit, such that we account for the "known unknowns" -- memory that hasn't yet been accounted for -- by
70    /// simply ensuring that we can fit within a portion of the overall limit.
71    ///
72    /// Values between 0 to 1 are allowed, and represent the percentage of `memory_limit` that is held back. This means
73    /// that a slop factor of 0.25, for example, will cause 25% of `memory_limit` to be withheld. If `memory_limit` was
74    /// 100 MB, we would then verify that the memory bounds can fit within 75 MB (100 MB * (1 - 0.25) => 75 MB).
75    #[serde(default = "default_memory_slop_factor")]
76    memory_slop_factor: f64,
77
78    /// Whether or not to enable the global memory limiter.
79    ///
80    /// When set to `false`, the global memory limiter will operate in a no-op mode. All calls to use it will never
81    /// exert backpressure, and only the inherent memory bounds of the running components will influence memory usage.
82    ///
83    /// Defaults to `true`.
84    #[serde(default = "default_enable_global_limiter")]
85    enable_global_limiter: bool,
86
87    /// The memory mode to use when reconciling the calculated memory bounds against the configured memory limit.
88    ///
89    /// See [`MemoryMode`] for the available modes and their behavior.
90    ///
91    /// Defaults to [`MemoryMode::Disabled`].
92    #[serde(default)]
93    memory_mode: MemoryMode,
94}
95
96impl MemoryBoundsConfiguration {
97    /// Attempts to read memory bounds configuration from the provided configuration.
98    ///
99    /// # Errors
100    ///
101    /// If an error occurs during deserialization, an error will be returned.
102    pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
103        let mut config = config
104            .as_typed::<Self>()
105            .error_context("Failed to parse memory bounds configuration.")?;
106
107        if config.memory_limit.is_none() {
108            // Try to pull configured memory limit from Cgroup if running in a containerized environment.
109            if let Ok(value) = env::var("DOCKER_DD_AGENT") {
110                if !value.is_empty() {
111                    let cgroup_memory_reader = CgroupMemoryParser;
112                    if let Some(memory) = cgroup_memory_reader.parse() {
113                        info!(
114                            "Setting memory limit to {} based on detected cgroups limit.",
115                            memory.display().si()
116                        );
117                        config.memory_limit = Some(memory);
118                    }
119                }
120            }
121        }
122
123        // Try constructing the initial grant based on the configuration as a smoke test to validate the values.
124        if let Some(limit) = config.memory_limit {
125            let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
126                .error_context("Given memory limit and/or slop factor invalid.")?;
127        }
128
129        Ok(config)
130    }
131
132    /// Gets the initial memory grant based on the configuration.
133    pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
134        self.memory_limit.map(|limit| {
135            MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
136                .expect("memory limit should be valid")
137        })
138    }
139}
140
141/// Initializes the memory bounds system and verifies any configured bounds based on the configured memory mode.
142///
143/// See [`MemoryMode`] for details on the behavior of each mode.
144///
145/// # Errors
146///
147/// If the bounds could not be validated under [`MemoryMode::Strict`], or if the configured grant is invalid, an error
148/// is returned.
149pub fn initialize_memory_bounds(
150    configuration: MemoryBoundsConfiguration, component_registry: ComponentRegistryHandle,
151) -> Result<MemoryLimiter, GenericError> {
152    let configured_grant = configuration
153        .memory_limit
154        .map(|limit| MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor))
155        .transpose()?;
156
157    let limiter_grant = match configuration.memory_mode {
158        MemoryMode::Disabled => {
159            info!("Memory limiting disabled.");
160            None
161        }
162        mode @ (MemoryMode::Permissive | MemoryMode::Strict) => match configured_grant {
163            Some(grant) => {
164                verify_bounds_for_mode(mode, grant, &component_registry)?;
165                Some(grant)
166            }
167            None => {
168                info!("No memory limit set for the process. Skipping memory bounds verification.");
169                None
170            }
171        },
172    };
173
174    let limiter = match limiter_grant {
175        Some(grant) if configuration.enable_global_limiter => MemoryLimiter::new(grant)
176            .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))?,
177        _ => MemoryLimiter::noop(),
178    };
179
180    Ok(limiter)
181}
182
183fn verify_bounds_for_mode(
184    mode: MemoryMode, initial_grant: MemoryGrant, component_registry: &ComponentRegistryHandle,
185) -> Result<(), GenericError> {
186    match component_registry.verify_bounds(initial_grant) {
187        Ok(verified_bounds) => {
188            info!(
189				"Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
190				bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
191				bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
192				bytes_to_si_string(verified_bounds.total_available_bytes()),
193				bytes_to_si_string(initial_grant.initial_limit_bytes()),
194			);
195
196            print_bounds(verified_bounds.bounds());
197            Ok(())
198        }
199        Err(e) => {
200            let bounds = component_registry.as_bounds();
201            print_bounds(&bounds);
202
203            match mode {
204                MemoryMode::Strict => {
205                    error!("Failed to verify memory bounds: {}.", e);
206                    Err(generic_error!(
207                        "Configured memory limit is insufficient for the current configuration."
208                    ))
209                }
210                MemoryMode::Permissive => {
211                    warn!(
212                        "Configured memory limit ({}) may be insufficient for the current configuration. Memory limiting behavior will be best effort. Continuing.",
213                        bytes_to_si_string(initial_grant.initial_limit_bytes()),
214                    );
215                    Ok(())
216                }
217                MemoryMode::Disabled => unreachable!("verify_bounds_for_mode is never called with Disabled mode"),
218            }
219        }
220    }
221}
222
223fn print_bounds(bounds: &ComponentBounds) {
224    info!("Breakdown of verified bounds:");
225    info!(
226        "- (root): {} minimum, {} firm",
227        bytes_to_si_string(bounds.total_minimum_required_bytes()),
228        bytes_to_si_string(bounds.total_firm_limit_bytes()),
229    );
230
231    let mut to_visit = VecDeque::new();
232    to_visit.extend(
233        bounds
234            .subcomponents()
235            .into_iter()
236            .map(|(name, bounds)| (1, name, bounds)),
237    );
238
239    while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
240        info!(
241            "{:indent$}- {}: {} minimum, {} firm",
242            "",
243            component_name,
244            bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
245            bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
246            indent = depth * 2
247        );
248
249        let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
250        while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
251            to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
252        }
253    }
254
255    info!("");
256}
257
258struct ResourceGroupMetrics {
259    totals: ResourceStatsSnapshot,
260    allocated_bytes_total: Counter,
261    allocated_bytes_live: Gauge,
262    allocated_objects_total: Counter,
263    allocated_objects_live: Gauge,
264    deallocated_bytes_total: Counter,
265    deallocated_objects_total: Counter,
266    cpu_time_nanos_total: Counter,
267}
268
269impl ResourceGroupMetrics {
270    fn new(group_name: &str) -> Self {
271        Self {
272            totals: ResourceStatsSnapshot::empty(),
273            allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
274            allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
275            allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
276            allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
277            deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
278            deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
279            cpu_time_nanos_total: counter!(level: Level::DEBUG, "group_cpu_time_nanos_total", "group_id" => group_name.to_string()),
280        }
281    }
282
283    fn update(&mut self, stats: &ResourceStats) {
284        let delta = stats.snapshot_delta(&self.totals);
285
286        self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
287        self.allocated_objects_total.increment(delta.allocated_objects as u64);
288        self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
289        self.deallocated_objects_total
290            .increment(delta.deallocated_objects as u64);
291        self.cpu_time_nanos_total.increment(delta.cpu_time_nanos);
292
293        self.totals.merge(&delta);
294        self.allocated_bytes_live
295            .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
296        self.allocated_objects_live
297            .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
298    }
299}
300
301/// A worker that periodically collects per-resource group memory usage statistics and emits the internal telemetry.
302///
303/// Additionally, asserts the memory API routes from the given [`ComponentRegistry`] as a [`DynamicRoute`] on the
304/// unprivileged API endpoint.
305pub struct ResourceTelemetryWorker {
306    component_registry: ComponentRegistryHandle,
307}
308
309impl ResourceTelemetryWorker {
310    /// Creates a new `ResourceTelemetryWorker` for the given component registry.
311    pub fn new(component_registry: &ComponentRegistry) -> Self {
312        Self {
313            component_registry: component_registry.root(),
314        }
315    }
316}
317
318#[async_trait]
319impl Supervisable for ResourceTelemetryWorker {
320    fn name(&self) -> &str {
321        "resource-telemetry"
322    }
323
324    async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
325        // We can't enforce, at compile-time, that the tracking allocator must be installed if a caller is trying to
326        // initialize the allocator's reporting infrastructure... but we can at least warn them if we detect it's not
327        // installed here at runtime.
328        if !ResourceGroupRegistry::allocator_installed() {
329            warn!("Tracking allocator not installed. Memory telemetry will not be available.");
330        }
331
332        let memory_routes = DynamicRoute::http(EndpointType::Unprivileged, self.component_registry.api_handler());
333
334        Ok(Box::pin(async move {
335            // Register our API routes before we actually start running.
336            DataspaceRegistry::try_current()
337                .ok_or_else(|| generic_error!("Dataspace not available."))?
338                .assert(memory_routes, "resource-telemetry-api");
339
340            let mut metrics = HashMap::new();
341
342            let shutdown = process_shutdown.wait_for_shutdown();
343            pin!(shutdown);
344
345            loop {
346                select! {
347                    _ = &mut shutdown => break,
348                    _ = sleep(Duration::from_secs(1)) => {
349                        ResourceGroupRegistry::global().visit_resource_groups(|group_name, stats| {
350                            let group_metrics = match metrics.get_mut(group_name) {
351                                Some(group_metrics) => group_metrics,
352                                None => metrics
353                                    .entry(group_name.to_string())
354                                    .or_insert_with(|| ResourceGroupMetrics::new(group_name)),
355                            };
356
357                            group_metrics.update(stats);
358                        });
359                    }
360                }
361            }
362
363            Ok(())
364        }))
365    }
366}
367
368struct CgroupMemoryParser;
369
370impl CgroupMemoryParser {
371    /// Parse memory limit from memory controller.
372    ///
373    /// Returns `None` if memory limit is set to max or if an error is encountered while parsing.
374    fn parse(self) -> Option<ByteSize> {
375        let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
376        let parts: Vec<&str> = contents.trim().split("\n").collect();
377        // CgroupV2 has unified controllers.
378        if parts.len() == 1 {
379            return self.parse_controller_v2(parts[0]);
380        }
381        for line in parts {
382            if line.contains(":memory:") {
383                return self.parse_controller_v1(line);
384            }
385        }
386        None
387    }
388
389    fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
390        let path = controller.split(":").nth(2)?;
391        let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
392        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
393        self.convert_to_bytesize(&raw_memory_limit)
394    }
395
396    fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
397        let path = controller.split(":").nth(2)?;
398        let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
399        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
400        self.convert_to_bytesize(&raw_memory_limit)
401    }
402
403    fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
404        let memory = s.trim().to_string();
405        if memory == "max" {
406            return None;
407        }
408        memory.parse::<ByteSize>().ok()
409    }
410}
411
412fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
413    ByteSize::b(bytes as u64).display().si()
414}