1use std::{collections::VecDeque, env, fs, time::Duration};
4
5use bytesize::ByteSize;
6use metrics::{counter, gauge, Counter, Gauge, Level};
7use resource_accounting::{
8 ComponentBounds, ComponentRegistry, ComponentRegistryHandle, MemoryGrant, MemoryLimiter, ResourceGroupRegistry,
9 ResourceStats, ResourceStatsSnapshot,
10};
11use saluki_api::{DynamicRoute, EndpointType};
12use saluki_common::{collections::FastHashMap, sync::shutdown::ShutdownHandle};
13use saluki_config::GenericConfiguration;
14use saluki_core::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture};
15use saluki_error::{generic_error, ErrorContext as _, GenericError};
16use serde::Deserialize;
17use tokio::{select, time::sleep};
18use tonic::async_trait;
19use tracing::{error, info, warn};
20
21const fn default_memory_slop_factor() -> f64 {
22 0.25
23}
24
25const fn default_enable_global_limiter() -> bool {
26 true
27}
28
29#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
31#[serde(rename_all = "lowercase")]
32pub enum MemoryMode {
33 #[default]
35 Disabled,
36
37 Permissive,
41
42 Strict,
46}
47
48#[derive(Deserialize)]
50pub struct MemoryBoundsConfiguration {
51 #[serde(default)]
58 memory_limit: Option<ByteSize>,
59
60 #[serde(default = "default_memory_slop_factor")]
71 memory_slop_factor: f64,
72
73 #[serde(default = "default_enable_global_limiter")]
80 enable_global_limiter: bool,
81
82 #[serde(default)]
88 memory_mode: MemoryMode,
89}
90
91impl MemoryBoundsConfiguration {
92 pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
98 let mut config = config
99 .as_typed::<Self>()
100 .error_context("Failed to parse memory bounds configuration.")?;
101
102 if config.memory_limit.is_none() {
103 if let Ok(value) = env::var("DOCKER_DD_AGENT") {
105 if !value.is_empty() {
106 let cgroup_memory_reader = CgroupMemoryParser;
107 if let Some(memory) = cgroup_memory_reader.parse() {
108 info!(
109 "Setting memory limit to {} based on detected cgroups limit.",
110 memory.display().si()
111 );
112 config.memory_limit = Some(memory);
113 }
114 }
115 }
116 }
117
118 if let Some(limit) = config.memory_limit {
120 let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
121 .error_context("Given memory limit and/or slop factor invalid.")?;
122 }
123
124 Ok(config)
125 }
126
127 pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
129 self.memory_limit.map(|limit| {
130 MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
131 .expect("memory limit should be valid")
132 })
133 }
134}
135
136pub fn initialize_memory_bounds(
145 configuration: MemoryBoundsConfiguration, component_registry: ComponentRegistryHandle,
146) -> Result<MemoryLimiter, GenericError> {
147 let configured_grant = configuration
148 .memory_limit
149 .map(|limit| MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor))
150 .transpose()?;
151
152 let limiter_grant = match configuration.memory_mode {
153 MemoryMode::Disabled => {
154 info!("Memory limiting disabled.");
155 None
156 }
157 mode @ (MemoryMode::Permissive | MemoryMode::Strict) => match configured_grant {
158 Some(grant) => {
159 verify_bounds_for_mode(mode, grant, &component_registry)?;
160 Some(grant)
161 }
162 None => {
163 info!("No memory limit set for the process. Skipping memory bounds verification.");
164 None
165 }
166 },
167 };
168
169 let limiter = match limiter_grant {
170 Some(grant) if configuration.enable_global_limiter => MemoryLimiter::new(grant)
171 .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))?,
172 _ => MemoryLimiter::noop(),
173 };
174
175 Ok(limiter)
176}
177
178fn verify_bounds_for_mode(
179 mode: MemoryMode, initial_grant: MemoryGrant, component_registry: &ComponentRegistryHandle,
180) -> Result<(), GenericError> {
181 match component_registry.verify_bounds(initial_grant) {
182 Ok(verified_bounds) => {
183 info!(
184 "Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
185 bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
186 bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
187 bytes_to_si_string(verified_bounds.total_available_bytes()),
188 bytes_to_si_string(initial_grant.initial_limit_bytes()),
189 );
190
191 print_bounds(verified_bounds.bounds());
192 Ok(())
193 }
194 Err(e) => {
195 let bounds = component_registry.as_bounds();
196 print_bounds(&bounds);
197
198 match mode {
199 MemoryMode::Strict => {
200 error!("Failed to verify memory bounds: {}.", e);
201 Err(generic_error!(
202 "Configured memory limit is insufficient for the current configuration."
203 ))
204 }
205 MemoryMode::Permissive => {
206 warn!(
207 "Configured memory limit ({}) may be insufficient for the current configuration. Memory limiting behavior will be best effort. Continuing.",
208 bytes_to_si_string(initial_grant.initial_limit_bytes()),
209 );
210 Ok(())
211 }
212 MemoryMode::Disabled => unreachable!("verify_bounds_for_mode is never called with Disabled mode"),
213 }
214 }
215 }
216}
217
218fn print_bounds(bounds: &ComponentBounds) {
219 info!("Breakdown of verified bounds:");
220 info!(
221 "- (root): {} minimum, {} firm",
222 bytes_to_si_string(bounds.total_minimum_required_bytes()),
223 bytes_to_si_string(bounds.total_firm_limit_bytes()),
224 );
225
226 let mut to_visit = VecDeque::new();
227 to_visit.extend(
228 bounds
229 .subcomponents()
230 .into_iter()
231 .map(|(name, bounds)| (1, name, bounds)),
232 );
233
234 while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
235 info!(
236 "{:indent$}- {}: {} minimum, {} firm",
237 "",
238 component_name,
239 bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
240 bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
241 indent = depth * 2
242 );
243
244 let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
245 while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
246 to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
247 }
248 }
249
250 info!("");
251}
252
253struct ResourceGroupMetrics {
254 totals: ResourceStatsSnapshot,
255 allocated_bytes_total: Counter,
256 allocated_bytes_live: Gauge,
257 allocated_objects_total: Counter,
258 allocated_objects_live: Gauge,
259 deallocated_bytes_total: Counter,
260 deallocated_objects_total: Counter,
261 cpu_time_nanos_total: Counter,
262}
263
264impl ResourceGroupMetrics {
265 fn new(group_name: &str) -> Self {
266 Self {
267 totals: ResourceStatsSnapshot::empty(),
268 allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
269 allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
270 allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
271 allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
272 deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
273 deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
274 cpu_time_nanos_total: counter!(level: Level::DEBUG, "group_cpu_time_nanos_total", "group_id" => group_name.to_string()),
275 }
276 }
277
278 fn update(&mut self, stats: &ResourceStats) {
279 let delta = stats.snapshot_delta(&self.totals);
280
281 self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
282 self.allocated_objects_total.increment(delta.allocated_objects as u64);
283 self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
284 self.deallocated_objects_total
285 .increment(delta.deallocated_objects as u64);
286 self.cpu_time_nanos_total.increment(delta.cpu_time_nanos);
287
288 self.totals.merge(&delta);
289 self.allocated_bytes_live
290 .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
291 self.allocated_objects_live
292 .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
293 }
294}
295
296pub struct ResourceTelemetryWorker {
301 component_registry: ComponentRegistryHandle,
302}
303
304impl ResourceTelemetryWorker {
305 pub fn new(component_registry: &ComponentRegistry) -> Self {
307 Self {
308 component_registry: component_registry.root(),
309 }
310 }
311}
312
313#[async_trait]
314impl Supervisable for ResourceTelemetryWorker {
315 fn name(&self) -> &str {
316 "resource-telemetry"
317 }
318
319 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
320 if !ResourceGroupRegistry::allocator_installed() {
324 warn!("Tracking allocator not installed. Memory telemetry will not be available.");
325 }
326
327 let memory_routes = DynamicRoute::http(EndpointType::Unprivileged, self.component_registry.api_handler());
328
329 Ok(Box::pin(async move {
330 DataspaceRegistry::try_current()
332 .ok_or_else(|| generic_error!("Dataspace not available."))?
333 .assert(memory_routes, "resource-telemetry-api");
334
335 select! {
336 _ = process_shutdown => {},
337 _ = run_resource_group_metrics_loop() => {},
338 }
339
340 Ok(())
341 }))
342 }
343}
344
345async fn run_resource_group_metrics_loop() {
346 let mut metrics = FastHashMap::default();
347
348 loop {
349 ResourceGroupRegistry::global().visit_resource_groups(|group_name, stats| {
350 let group_metrics = match metrics.get_mut(group_name) {
351 Some(group_metrics) => group_metrics,
352 None => metrics
353 .entry(group_name.to_string())
354 .or_insert_with(|| ResourceGroupMetrics::new(group_name)),
355 };
356
357 group_metrics.update(stats);
358 });
359
360 sleep(Duration::from_secs(1)).await;
361 }
362}
363
364struct CgroupMemoryParser;
365
366impl CgroupMemoryParser {
367 fn parse(self) -> Option<ByteSize> {
371 let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
372 let parts: Vec<&str> = contents.trim().split("\n").collect();
373 if parts.len() == 1 {
375 return self.parse_controller_v2(parts[0]);
376 }
377 for line in parts {
378 if line.contains(":memory:") {
379 return self.parse_controller_v1(line);
380 }
381 }
382 None
383 }
384
385 fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
386 let path = controller.split(":").nth(2)?;
387 let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
388 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
389 self.convert_to_bytesize(&raw_memory_limit)
390 }
391
392 fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
393 let path = controller.split(":").nth(2)?;
394 let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
395 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
396 self.convert_to_bytesize(&raw_memory_limit)
397 }
398
399 fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
400 let memory = s.trim().to_string();
401 if memory == "max" {
402 return None;
403 }
404 memory.parse::<ByteSize>().ok()
405 }
406}
407
408fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
409 ByteSize::b(bytes as u64).display().si()
410}