memory_accounting/
limiter.rs

1use std::{
2    sync::{
3        atomic::{AtomicU64, Ordering::Relaxed},
4        Arc,
5    },
6    time::Duration,
7};
8
9use metrics::gauge;
10use process_memory::Querier;
11use tracing::debug;
12
13use crate::MemoryGrant;
14
15/// A process-wide memory limiter.
16///
17/// In many cases, it can be useful to know when the process has exceeded a certain memory usage threshold in order to
18/// be able to react that: clearing caches, temporarily blocking requests, and so on.
19///
20/// `MemoryLimiter` watches the process's physical memory usage (Resident Set Size/Working Set) and keeps track of when
21/// the usage exceeds the configured limit. Cooperating tasks call `wait_for_capacity` to participate in accepting
22/// backpressure that can be used to throttle work that is responsible for allocating memory.
23///
24/// The backpressure is scaled based on how close the memory usage is to the configured limit, with a minimum and
25/// maximum backoff duration that is applied. This means that until we are using a certain percentage of the configured
26/// limit, no backpressure is applied. Once that threshold is crossed, backpressure is applied proportionally to how
27/// close to the limit we are. Callers are never fully blocked even if the limit is reached.
28#[derive(Clone)]
29pub struct MemoryLimiter {
30    active_backoff_nanos: Arc<AtomicU64>,
31}
32
33impl MemoryLimiter {
34    /// Creates a new `MemoryLimiter` based on the given `MemoryGrant`.
35    ///
36    /// A background task is spawned that frequently checks the used memory for the entire process, and callers of
37    /// `wait_for_capacity` will observe waiting once the memory usage exceeds the configured limit threshold. The
38    /// waiting time will scale progressively the closer the memory usage is to the configured limit.
39    ///
40    /// Defaults to a 95% threshold (i.e. threshold begins at 95% of the limit), a minimum backoff duration of 1ms, and
41    /// a maximum backoff duration of 25ms. The effective limit of the grant is used as the memory limit.
42    pub fn new(grant: MemoryGrant) -> Option<Self> {
43        // Smoke test to see if we can even collect memory stats on this system.
44        Querier::default().resident_set_size()?;
45
46        let rss_limit = grant.effective_limit_bytes();
47        let backoff_threshold = 0.95;
48        let backoff_min = Duration::from_millis(1);
49        let backoff_max = Duration::from_millis(25);
50
51        let active_backoff_nanos = Arc::new(AtomicU64::new(0));
52        let active_backoff_nanos2 = Arc::clone(&active_backoff_nanos);
53
54        std::thread::Builder::new()
55            .name("memory-limiter-checker".to_string())
56            .spawn(move || {
57                check_memory_usage(
58                    active_backoff_nanos2,
59                    rss_limit,
60                    backoff_threshold,
61                    backoff_min,
62                    backoff_max,
63                )
64            })
65            .unwrap();
66
67        Some(Self { active_backoff_nanos })
68    }
69
70    /// Creates a no-op `MemoryLimiter` that does not perform any limiting.
71    ///
72    /// All calls to `wait_for_capacity` will return immediately.
73    pub fn noop() -> Self {
74        Self {
75            active_backoff_nanos: Arc::new(AtomicU64::new(0)),
76        }
77    }
78
79    /// Waits a short period of time based on the available memory.
80    ///
81    /// If the used memory is not within the threshold of the configured limit, no waiting will occur. Otherwise, the
82    /// call will wait a variable amount of time depending on how close to the configured limit the process is.
83    pub async fn wait_for_capacity(&self) {
84        let active_backoff_nanos = self.active_backoff_nanos.load(Relaxed);
85        if active_backoff_nanos > 0 {
86            tokio::time::sleep(Duration::from_nanos(active_backoff_nanos)).await;
87        }
88    }
89}
90
91fn check_memory_usage(
92    active_backoff_nanos: Arc<AtomicU64>, rss_limit: usize, backoff_threshold: f64, backoff_min: Duration,
93    backoff_max: Duration,
94) {
95    debug!("Memory limiter checker started.");
96
97    let mut querier = Querier::default();
98
99    loop {
100        let actual_rss = querier
101            .resident_set_size()
102            .expect("memory statistics should be available");
103
104        let maybe_backoff_duration =
105            calculate_backoff(rss_limit, actual_rss, backoff_threshold, backoff_min, backoff_max);
106        match maybe_backoff_duration {
107            Some(backoff_dur) => {
108                active_backoff_nanos.store(backoff_dur.as_nanos() as u64, Relaxed);
109
110                debug!(rss_limit, actual_rss, current_backoff = ?backoff_dur, "Enforcing backoff due to memory pressure.");
111                gauge!("memory_limiter.current_backoff_secs").set(backoff_dur.as_secs_f64());
112            }
113            None => {
114                active_backoff_nanos.store(0, Relaxed);
115
116                gauge!("memory_limiter.current_backoff_secs").set(0.0);
117            }
118        }
119
120        std::thread::sleep(Duration::from_millis(250));
121    }
122}
123
124fn calculate_backoff(
125    rss_limit: usize, actual_rss: usize, backoff_threshold: f64, backoff_min: Duration, backoff_max: Duration,
126) -> Option<Duration> {
127    if actual_rss as f64 > rss_limit as f64 * backoff_threshold {
128        // When we're over the threshold, we want to scale our backoff range across the remainder of the threshold.
129        //
130        // For example, if our minimum and maximum backoff durations are 100ms and 1000ms, respectively, and our limit
131        // is 100MB with a threshold of 95% (0.95), then we would start backing off by 100ms when we hit 95MB, and we
132        // would back off at a maximum of 1000ms at 100% (or greater) of our limit.
133        //
134        // Between those two points, we would spread the difference of the minimum/maximum backoff duration (1000ms -
135        // 100ms => 900ms) across that 5%. Thus, we would expect that at 97.5% of the limit, we would be backing off by
136        // 550ms. (0.5 * 900ms => 450ms, 100ms + 450ms => 550ms)
137        let rss_backoff_range = rss_limit as f64 - (rss_limit as f64 * backoff_threshold);
138        let backoff_duration_range = backoff_max - backoff_min;
139        let threshold_delta = actual_rss as f64 - (rss_limit as f64 * backoff_threshold);
140        let variable_backoff_duration = backoff_duration_range.as_secs_f64() * (threshold_delta / rss_backoff_range);
141
142        let backoff = backoff_min + Duration::from_secs_f64(variable_backoff_duration);
143        if backoff > backoff_max {
144            Some(backoff_max)
145        } else {
146            Some(backoff)
147        }
148    } else {
149        None
150    }
151}