Skip to main content

saluki_app/
accounting.rs

1//! Resource accounting and telemetry.
2
3use std::{collections::VecDeque, env, fs, time::Duration};
4
5use bytesize::ByteSize;
6use metrics::{counter, gauge, Counter, Gauge, Level};
7use resource_accounting::{
8    ComponentBounds, ComponentRegistry, ComponentRegistryHandle, MemoryGrant, MemoryLimiter, ResourceGroupRegistry,
9    ResourceStats, ResourceStatsSnapshot,
10};
11use saluki_api::{DynamicRoute, EndpointType};
12use saluki_common::{collections::FastHashMap, sync::shutdown::ShutdownHandle};
13use saluki_config::GenericConfiguration;
14use saluki_core::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture};
15use saluki_error::{generic_error, ErrorContext as _, GenericError};
16use serde::Deserialize;
17use tokio::{select, time::sleep};
18use tonic::async_trait;
19use tracing::{error, info, warn};
20
21const fn default_memory_slop_factor() -> f64 {
22    0.25
23}
24
25const fn default_enable_global_limiter() -> bool {
26    true
27}
28
29/// Bounds validation and global memory limiter behavior.
30#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
31#[serde(rename_all = "lowercase")]
32pub enum MemoryMode {
33    /// Bounds validation is skipped, and no memory limiting is applied.
34    #[default]
35    Disabled,
36
37    /// Treat bounds validation failures as non-fatal.
38    ///
39    /// Global memory limiter will be enabled and active if a memory limit is configured.
40    Permissive,
41
42    /// Treat bounds validation failures as fatal.
43    ///
44    /// Global memory limiter will be enabled and active if a memory limit is configured.
45    Strict,
46}
47
48/// Configuration for memory bounds.
49#[derive(Deserialize)]
50pub struct MemoryBoundsConfiguration {
51    /// The memory limit to adhere to.
52    ///
53    /// This should be the overall memory limit for the entire process. The value can either be an integer for
54    /// specifying the limit in bytes, or a string that uses SI byte prefixes (case-insensitive) such as `1mb` or `1GB`.
55    ///
56    /// If not specified, no memory bounds verification will be performed.
57    #[serde(default)]
58    memory_limit: Option<ByteSize>,
59
60    /// The slop factor to apply to the given memory limit.
61    ///
62    /// Memory bounds are inherently fuzzy, as components are required to manually define their bounds, and as such, can
63    /// only account for memory usage that they know about. The slop factor is applied as a reduction to the overall
64    /// memory limit, such that we account for the "known unknowns" -- memory that hasn't yet been accounted for -- by
65    /// simply ensuring that we can fit within a portion of the overall limit.
66    ///
67    /// Values between 0 to 1 are allowed, and represent the percentage of `memory_limit` that is held back. This means
68    /// that a slop factor of 0.25, for example, will cause 25% of `memory_limit` to be withheld. If `memory_limit` was
69    /// 100 MB, we would then verify that the memory bounds can fit within 75 MB (100 MB * (1 - 0.25) => 75 MB).
70    #[serde(default = "default_memory_slop_factor")]
71    memory_slop_factor: f64,
72
73    /// Whether or not to enable the global memory limiter.
74    ///
75    /// When set to `false`, the global memory limiter will operate in a no-op mode. All calls to use it will never
76    /// exert backpressure, and only the inherent memory bounds of the running components will influence memory usage.
77    ///
78    /// Defaults to `true`.
79    #[serde(default = "default_enable_global_limiter")]
80    enable_global_limiter: bool,
81
82    /// The memory mode to use when reconciling the calculated memory bounds against the configured memory limit.
83    ///
84    /// See [`MemoryMode`] for the available modes and their behavior.
85    ///
86    /// Defaults to [`MemoryMode::Disabled`].
87    #[serde(default)]
88    memory_mode: MemoryMode,
89}
90
91impl MemoryBoundsConfiguration {
92    /// Attempts to read memory bounds configuration from the provided configuration.
93    ///
94    /// # Errors
95    ///
96    /// If an error occurs during deserialization, an error will be returned.
97    pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
98        let mut config = config
99            .as_typed::<Self>()
100            .error_context("Failed to parse memory bounds configuration.")?;
101
102        if config.memory_limit.is_none() {
103            // Try to pull configured memory limit from Cgroup if running in a containerized environment.
104            if let Ok(value) = env::var("DOCKER_DD_AGENT") {
105                if !value.is_empty() {
106                    let cgroup_memory_reader = CgroupMemoryParser;
107                    if let Some(memory) = cgroup_memory_reader.parse() {
108                        info!(
109                            "Setting memory limit to {} based on detected cgroups limit.",
110                            memory.display().si()
111                        );
112                        config.memory_limit = Some(memory);
113                    }
114                }
115            }
116        }
117
118        // Try constructing the initial grant based on the configuration as a smoke test to validate the values.
119        if let Some(limit) = config.memory_limit {
120            let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
121                .error_context("Given memory limit and/or slop factor invalid.")?;
122        }
123
124        Ok(config)
125    }
126
127    /// Gets the initial memory grant based on the configuration.
128    pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
129        self.memory_limit.map(|limit| {
130            MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
131                .expect("memory limit should be valid")
132        })
133    }
134}
135
136/// Initializes the memory bounds system and verifies any configured bounds based on the configured memory mode.
137///
138/// See [`MemoryMode`] for details on the behavior of each mode.
139///
140/// # Errors
141///
142/// If the bounds could not be validated under [`MemoryMode::Strict`], or if the configured grant is invalid, an error
143/// is returned.
144pub fn initialize_memory_bounds(
145    configuration: MemoryBoundsConfiguration, component_registry: ComponentRegistryHandle,
146) -> Result<MemoryLimiter, GenericError> {
147    let configured_grant = configuration
148        .memory_limit
149        .map(|limit| MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor))
150        .transpose()?;
151
152    let limiter_grant = match configuration.memory_mode {
153        MemoryMode::Disabled => {
154            info!("Memory limiting disabled.");
155            None
156        }
157        mode @ (MemoryMode::Permissive | MemoryMode::Strict) => match configured_grant {
158            Some(grant) => {
159                verify_bounds_for_mode(mode, grant, &component_registry)?;
160                Some(grant)
161            }
162            None => {
163                info!("No memory limit set for the process. Skipping memory bounds verification.");
164                None
165            }
166        },
167    };
168
169    let limiter = match limiter_grant {
170        Some(grant) if configuration.enable_global_limiter => MemoryLimiter::new(grant)
171            .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))?,
172        _ => MemoryLimiter::noop(),
173    };
174
175    Ok(limiter)
176}
177
178fn verify_bounds_for_mode(
179    mode: MemoryMode, initial_grant: MemoryGrant, component_registry: &ComponentRegistryHandle,
180) -> Result<(), GenericError> {
181    match component_registry.verify_bounds(initial_grant) {
182        Ok(verified_bounds) => {
183            info!(
184				"Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
185				bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
186				bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
187				bytes_to_si_string(verified_bounds.total_available_bytes()),
188				bytes_to_si_string(initial_grant.initial_limit_bytes()),
189			);
190
191            print_bounds(verified_bounds.bounds());
192            Ok(())
193        }
194        Err(e) => {
195            let bounds = component_registry.as_bounds();
196            print_bounds(&bounds);
197
198            match mode {
199                MemoryMode::Strict => {
200                    error!("Failed to verify memory bounds: {}.", e);
201                    Err(generic_error!(
202                        "Configured memory limit is insufficient for the current configuration."
203                    ))
204                }
205                MemoryMode::Permissive => {
206                    warn!(
207                        "Configured memory limit ({}) may be insufficient for the current configuration. Memory limiting behavior will be best effort. Continuing.",
208                        bytes_to_si_string(initial_grant.initial_limit_bytes()),
209                    );
210                    Ok(())
211                }
212                MemoryMode::Disabled => unreachable!("verify_bounds_for_mode is never called with Disabled mode"),
213            }
214        }
215    }
216}
217
218fn print_bounds(bounds: &ComponentBounds) {
219    info!("Breakdown of verified bounds:");
220    info!(
221        "- (root): {} minimum, {} firm",
222        bytes_to_si_string(bounds.total_minimum_required_bytes()),
223        bytes_to_si_string(bounds.total_firm_limit_bytes()),
224    );
225
226    let mut to_visit = VecDeque::new();
227    to_visit.extend(
228        bounds
229            .subcomponents()
230            .into_iter()
231            .map(|(name, bounds)| (1, name, bounds)),
232    );
233
234    while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
235        info!(
236            "{:indent$}- {}: {} minimum, {} firm",
237            "",
238            component_name,
239            bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
240            bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
241            indent = depth * 2
242        );
243
244        let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
245        while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
246            to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
247        }
248    }
249
250    info!("");
251}
252
253struct ResourceGroupMetrics {
254    totals: ResourceStatsSnapshot,
255    allocated_bytes_total: Counter,
256    allocated_bytes_live: Gauge,
257    allocated_objects_total: Counter,
258    allocated_objects_live: Gauge,
259    deallocated_bytes_total: Counter,
260    deallocated_objects_total: Counter,
261    cpu_time_nanos_total: Counter,
262}
263
264impl ResourceGroupMetrics {
265    fn new(group_name: &str) -> Self {
266        Self {
267            totals: ResourceStatsSnapshot::empty(),
268            allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
269            allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
270            allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
271            allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
272            deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
273            deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
274            cpu_time_nanos_total: counter!(level: Level::DEBUG, "group_cpu_time_nanos_total", "group_id" => group_name.to_string()),
275        }
276    }
277
278    fn update(&mut self, stats: &ResourceStats) {
279        let delta = stats.snapshot_delta(&self.totals);
280
281        self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
282        self.allocated_objects_total.increment(delta.allocated_objects as u64);
283        self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
284        self.deallocated_objects_total
285            .increment(delta.deallocated_objects as u64);
286        self.cpu_time_nanos_total.increment(delta.cpu_time_nanos);
287
288        self.totals.merge(&delta);
289        self.allocated_bytes_live
290            .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
291        self.allocated_objects_live
292            .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
293    }
294}
295
296/// A worker that periodically collects per-resource group memory usage statistics and emits the internal telemetry.
297///
298/// Additionally, asserts the memory API routes from the given [`ComponentRegistry`] as a [`DynamicRoute`] on the
299/// unprivileged API endpoint.
300pub struct ResourceTelemetryWorker {
301    component_registry: ComponentRegistryHandle,
302}
303
304impl ResourceTelemetryWorker {
305    /// Creates a new `ResourceTelemetryWorker` for the given component registry.
306    pub fn new(component_registry: &ComponentRegistry) -> Self {
307        Self {
308            component_registry: component_registry.root(),
309        }
310    }
311}
312
313#[async_trait]
314impl Supervisable for ResourceTelemetryWorker {
315    fn name(&self) -> &str {
316        "resource-telemetry"
317    }
318
319    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
320        // We can't enforce, at compile-time, that the tracking allocator must be installed if a caller is trying to
321        // initialize the allocator's reporting infrastructure... but we can at least warn them if we detect it's not
322        // installed here at runtime.
323        if !ResourceGroupRegistry::allocator_installed() {
324            warn!("Tracking allocator not installed. Memory telemetry will not be available.");
325        }
326
327        let memory_routes = DynamicRoute::http(EndpointType::Unprivileged, self.component_registry.api_handler());
328
329        Ok(Box::pin(async move {
330            // Register our API routes before we actually start running.
331            DataspaceRegistry::try_current()
332                .ok_or_else(|| generic_error!("Dataspace not available."))?
333                .assert(memory_routes, "resource-telemetry-api");
334
335            select! {
336                _ = process_shutdown => {},
337                _ = run_resource_group_metrics_loop() => {},
338            }
339
340            Ok(())
341        }))
342    }
343}
344
345async fn run_resource_group_metrics_loop() {
346    let mut metrics = FastHashMap::default();
347
348    loop {
349        ResourceGroupRegistry::global().visit_resource_groups(|group_name, stats| {
350            let group_metrics = match metrics.get_mut(group_name) {
351                Some(group_metrics) => group_metrics,
352                None => metrics
353                    .entry(group_name.to_string())
354                    .or_insert_with(|| ResourceGroupMetrics::new(group_name)),
355            };
356
357            group_metrics.update(stats);
358        });
359
360        sleep(Duration::from_secs(1)).await;
361    }
362}
363
364struct CgroupMemoryParser;
365
366impl CgroupMemoryParser {
367    /// Parse memory limit from memory controller.
368    ///
369    /// Returns `None` if memory limit is set to max or if an error is encountered while parsing.
370    fn parse(self) -> Option<ByteSize> {
371        let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
372        let parts: Vec<&str> = contents.trim().split("\n").collect();
373        // CgroupV2 has unified controllers.
374        if parts.len() == 1 {
375            return self.parse_controller_v2(parts[0]);
376        }
377        for line in parts {
378            if line.contains(":memory:") {
379                return self.parse_controller_v1(line);
380            }
381        }
382        None
383    }
384
385    fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
386        let path = controller.split(":").nth(2)?;
387        let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
388        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
389        self.convert_to_bytesize(&raw_memory_limit)
390    }
391
392    fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
393        let path = controller.split(":").nth(2)?;
394        let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
395        let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
396        self.convert_to_bytesize(&raw_memory_limit)
397    }
398
399    fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
400        let memory = s.trim().to_string();
401        if memory == "max" {
402            return None;
403        }
404        memory.parse::<ByteSize>().ok()
405    }
406}
407
408fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
409    ByteSize::b(bytes as u64).display().si()
410}