1use std::{
4 collections::{HashMap, VecDeque},
5 env, fs,
6 time::Duration,
7};
8
9use bytesize::ByteSize;
10use metrics::{counter, gauge, Counter, Gauge, Level};
11use resource_accounting::{
12 ComponentBounds, ComponentRegistry, ComponentRegistryHandle, MemoryGrant, MemoryLimiter, ResourceGroupRegistry,
13 ResourceStats, ResourceStatsSnapshot,
14};
15use saluki_api::{DynamicRoute, EndpointType};
16use saluki_config::GenericConfiguration;
17use saluki_core::runtime::{
18 state::DataspaceRegistry, InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
19};
20use saluki_error::{generic_error, ErrorContext as _, GenericError};
21use serde::Deserialize;
22use tokio::{pin, select, time::sleep};
23use tonic::async_trait;
24use tracing::{error, info, warn};
25
26const fn default_memory_slop_factor() -> f64 {
27 0.25
28}
29
30const fn default_enable_global_limiter() -> bool {
31 true
32}
33
34#[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
36#[serde(rename_all = "lowercase")]
37pub enum MemoryMode {
38 #[default]
40 Disabled,
41
42 Permissive,
46
47 Strict,
51}
52
53#[derive(Deserialize)]
55pub struct MemoryBoundsConfiguration {
56 #[serde(default)]
63 memory_limit: Option<ByteSize>,
64
65 #[serde(default = "default_memory_slop_factor")]
76 memory_slop_factor: f64,
77
78 #[serde(default = "default_enable_global_limiter")]
85 enable_global_limiter: bool,
86
87 #[serde(default)]
93 memory_mode: MemoryMode,
94}
95
96impl MemoryBoundsConfiguration {
97 pub fn try_from_config(config: &GenericConfiguration) -> Result<Self, GenericError> {
103 let mut config = config
104 .as_typed::<Self>()
105 .error_context("Failed to parse memory bounds configuration.")?;
106
107 if config.memory_limit.is_none() {
108 if let Ok(value) = env::var("DOCKER_DD_AGENT") {
110 if !value.is_empty() {
111 let cgroup_memory_reader = CgroupMemoryParser;
112 if let Some(memory) = cgroup_memory_reader.parse() {
113 info!(
114 "Setting memory limit to {} based on detected cgroups limit.",
115 memory.display().si()
116 );
117 config.memory_limit = Some(memory);
118 }
119 }
120 }
121 }
122
123 if let Some(limit) = config.memory_limit {
125 let _ = MemoryGrant::with_slop_factor(limit.as_u64() as usize, config.memory_slop_factor)
126 .error_context("Given memory limit and/or slop factor invalid.")?;
127 }
128
129 Ok(config)
130 }
131
132 pub fn get_initial_grant(&self) -> Option<MemoryGrant> {
134 self.memory_limit.map(|limit| {
135 MemoryGrant::with_slop_factor(limit.as_u64() as usize, self.memory_slop_factor)
136 .expect("memory limit should be valid")
137 })
138 }
139}
140
141pub fn initialize_memory_bounds(
150 configuration: MemoryBoundsConfiguration, component_registry: ComponentRegistryHandle,
151) -> Result<MemoryLimiter, GenericError> {
152 let configured_grant = configuration
153 .memory_limit
154 .map(|limit| MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor))
155 .transpose()?;
156
157 let limiter_grant = match configuration.memory_mode {
158 MemoryMode::Disabled => {
159 info!("Memory limiting disabled.");
160 None
161 }
162 mode @ (MemoryMode::Permissive | MemoryMode::Strict) => match configured_grant {
163 Some(grant) => {
164 verify_bounds_for_mode(mode, grant, &component_registry)?;
165 Some(grant)
166 }
167 None => {
168 info!("No memory limit set for the process. Skipping memory bounds verification.");
169 None
170 }
171 },
172 };
173
174 let limiter = match limiter_grant {
175 Some(grant) if configuration.enable_global_limiter => MemoryLimiter::new(grant)
176 .ok_or_else(|| generic_error!("Memory statistics cannot be gathered on this system."))?,
177 _ => MemoryLimiter::noop(),
178 };
179
180 Ok(limiter)
181}
182
183fn verify_bounds_for_mode(
184 mode: MemoryMode, initial_grant: MemoryGrant, component_registry: &ComponentRegistryHandle,
185) -> Result<(), GenericError> {
186 match component_registry.verify_bounds(initial_grant) {
187 Ok(verified_bounds) => {
188 info!(
189 "Verified memory bounds. Minimum memory requirement of {}, with a calculated firm memory bound of {} out of {} available, from an initial {} grant.",
190 bytes_to_si_string(verified_bounds.total_minimum_required_bytes()),
191 bytes_to_si_string(verified_bounds.total_firm_limit_bytes()),
192 bytes_to_si_string(verified_bounds.total_available_bytes()),
193 bytes_to_si_string(initial_grant.initial_limit_bytes()),
194 );
195
196 print_bounds(verified_bounds.bounds());
197 Ok(())
198 }
199 Err(e) => {
200 let bounds = component_registry.as_bounds();
201 print_bounds(&bounds);
202
203 match mode {
204 MemoryMode::Strict => {
205 error!("Failed to verify memory bounds: {}.", e);
206 Err(generic_error!(
207 "Configured memory limit is insufficient for the current configuration."
208 ))
209 }
210 MemoryMode::Permissive => {
211 warn!(
212 "Configured memory limit ({}) may be insufficient for the current configuration. Memory limiting behavior will be best effort. Continuing.",
213 bytes_to_si_string(initial_grant.initial_limit_bytes()),
214 );
215 Ok(())
216 }
217 MemoryMode::Disabled => unreachable!("verify_bounds_for_mode is never called with Disabled mode"),
218 }
219 }
220 }
221}
222
223fn print_bounds(bounds: &ComponentBounds) {
224 info!("Breakdown of verified bounds:");
225 info!(
226 "- (root): {} minimum, {} firm",
227 bytes_to_si_string(bounds.total_minimum_required_bytes()),
228 bytes_to_si_string(bounds.total_firm_limit_bytes()),
229 );
230
231 let mut to_visit = VecDeque::new();
232 to_visit.extend(
233 bounds
234 .subcomponents()
235 .into_iter()
236 .map(|(name, bounds)| (1, name, bounds)),
237 );
238
239 while let Some((depth, component_name, component_bounds)) = to_visit.pop_front() {
240 info!(
241 "{:indent$}- {}: {} minimum, {} firm",
242 "",
243 component_name,
244 bytes_to_si_string(component_bounds.total_minimum_required_bytes()),
245 bytes_to_si_string(component_bounds.total_firm_limit_bytes()),
246 indent = depth * 2
247 );
248
249 let mut subcomponents = component_bounds.subcomponents().into_iter().collect::<Vec<_>>();
250 while let Some((subcomponent_name, subcomponent_bounds)) = subcomponents.pop() {
251 to_visit.push_front((depth + 1, subcomponent_name, subcomponent_bounds));
252 }
253 }
254
255 info!("");
256}
257
258struct ResourceGroupMetrics {
259 totals: ResourceStatsSnapshot,
260 allocated_bytes_total: Counter,
261 allocated_bytes_live: Gauge,
262 allocated_objects_total: Counter,
263 allocated_objects_live: Gauge,
264 deallocated_bytes_total: Counter,
265 deallocated_objects_total: Counter,
266 cpu_time_nanos_total: Counter,
267}
268
269impl ResourceGroupMetrics {
270 fn new(group_name: &str) -> Self {
271 Self {
272 totals: ResourceStatsSnapshot::empty(),
273 allocated_bytes_total: counter!(level: Level::DEBUG, "group_allocated_bytes_total", "group_id" => group_name.to_string()),
274 allocated_bytes_live: gauge!(level: Level::DEBUG, "group_allocated_bytes_live", "group_id" => group_name.to_string()),
275 allocated_objects_total: counter!(level: Level::DEBUG, "group_allocated_objects_total", "group_id" => group_name.to_string()),
276 allocated_objects_live: gauge!(level: Level::DEBUG, "group_allocated_objects_live", "group_id" => group_name.to_string()),
277 deallocated_bytes_total: counter!(level: Level::DEBUG, "group_deallocated_bytes_total", "group_id" => group_name.to_string()),
278 deallocated_objects_total: counter!(level: Level::DEBUG, "group_deallocated_objects_total", "group_id" => group_name.to_string()),
279 cpu_time_nanos_total: counter!(level: Level::DEBUG, "group_cpu_time_nanos_total", "group_id" => group_name.to_string()),
280 }
281 }
282
283 fn update(&mut self, stats: &ResourceStats) {
284 let delta = stats.snapshot_delta(&self.totals);
285
286 self.allocated_bytes_total.increment(delta.allocated_bytes as u64);
287 self.allocated_objects_total.increment(delta.allocated_objects as u64);
288 self.deallocated_bytes_total.increment(delta.deallocated_bytes as u64);
289 self.deallocated_objects_total
290 .increment(delta.deallocated_objects as u64);
291 self.cpu_time_nanos_total.increment(delta.cpu_time_nanos);
292
293 self.totals.merge(&delta);
294 self.allocated_bytes_live
295 .set((self.totals.allocated_bytes - self.totals.deallocated_bytes) as f64);
296 self.allocated_objects_live
297 .set((self.totals.allocated_objects - self.totals.deallocated_objects) as f64);
298 }
299}
300
301pub struct ResourceTelemetryWorker {
306 component_registry: ComponentRegistryHandle,
307}
308
309impl ResourceTelemetryWorker {
310 pub fn new(component_registry: &ComponentRegistry) -> Self {
312 Self {
313 component_registry: component_registry.root(),
314 }
315 }
316}
317
318#[async_trait]
319impl Supervisable for ResourceTelemetryWorker {
320 fn name(&self) -> &str {
321 "resource-telemetry"
322 }
323
324 async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
325 if !ResourceGroupRegistry::allocator_installed() {
329 warn!("Tracking allocator not installed. Memory telemetry will not be available.");
330 }
331
332 let memory_routes = DynamicRoute::http(EndpointType::Unprivileged, self.component_registry.api_handler());
333
334 Ok(Box::pin(async move {
335 DataspaceRegistry::try_current()
337 .ok_or_else(|| generic_error!("Dataspace not available."))?
338 .assert(memory_routes, "resource-telemetry-api");
339
340 let mut metrics = HashMap::new();
341
342 let shutdown = process_shutdown.wait_for_shutdown();
343 pin!(shutdown);
344
345 loop {
346 select! {
347 _ = &mut shutdown => break,
348 _ = sleep(Duration::from_secs(1)) => {
349 ResourceGroupRegistry::global().visit_resource_groups(|group_name, stats| {
350 let group_metrics = match metrics.get_mut(group_name) {
351 Some(group_metrics) => group_metrics,
352 None => metrics
353 .entry(group_name.to_string())
354 .or_insert_with(|| ResourceGroupMetrics::new(group_name)),
355 };
356
357 group_metrics.update(stats);
358 });
359 }
360 }
361 }
362
363 Ok(())
364 }))
365 }
366}
367
368struct CgroupMemoryParser;
369
370impl CgroupMemoryParser {
371 fn parse(self) -> Option<ByteSize> {
375 let contents = fs::read_to_string("/proc/self/cgroup").ok()?;
376 let parts: Vec<&str> = contents.trim().split("\n").collect();
377 if parts.len() == 1 {
379 return self.parse_controller_v2(parts[0]);
380 }
381 for line in parts {
382 if line.contains(":memory:") {
383 return self.parse_controller_v1(line);
384 }
385 }
386 None
387 }
388
389 fn parse_controller_v1(self, controller: &str) -> Option<ByteSize> {
390 let path = controller.split(":").nth(2)?;
391 let memory_path = format!("/sys/fs/cgroup/memory{}/memory.limit_in_bytes", path);
392 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
393 self.convert_to_bytesize(&raw_memory_limit)
394 }
395
396 fn parse_controller_v2(self, controller: &str) -> Option<ByteSize> {
397 let path = controller.split(":").nth(2)?;
398 let memory_path = format!("/sys/fs/cgroup{}/memory.max", path);
399 let raw_memory_limit = fs::read_to_string(memory_path).ok()?;
400 self.convert_to_bytesize(&raw_memory_limit)
401 }
402
403 fn convert_to_bytesize(self, s: &str) -> Option<ByteSize> {
404 let memory = s.trim().to_string();
405 if memory == "max" {
406 return None;
407 }
408 memory.parse::<ByteSize>().ok()
409 }
410}
411
412fn bytes_to_si_string(bytes: usize) -> bytesize::Display {
413 ByteSize::b(bytes as u64).display().si()
414}