memory_accounting/limiter.rs
1use std::{
2 sync::{
3 atomic::{AtomicU64, Ordering::Relaxed},
4 Arc,
5 },
6 time::Duration,
7};
8
9use metrics::gauge;
10use process_memory::Querier;
11use tracing::debug;
12
13use crate::MemoryGrant;
14
15/// A process-wide memory limiter.
16///
17/// In many cases, it can be useful to know when the process has exceeded a certain memory usage threshold in order to
18/// be able to react that: clearing caches, temporarily blocking requests, and so on.
19///
20/// `MemoryLimiter` watches the process's physical memory usage (Resident Set Size/Working Set) and keeps track of when
21/// the usage exceeds the configured limit. Cooperating tasks call `wait_for_capacity` to participate in accepting
22/// backpressure that can be used to throttle work that is responsible for allocating memory.
23///
24/// The backpressure is scaled based on how close the memory usage is to the configured limit, with a minimum and
25/// maximum backoff duration that is applied. This means that until we are using a certain percentage of the configured
26/// limit, no backpressure is applied. Once that threshold is crossed, backpressure is applied proportionally to how
27/// close to the limit we are. Callers are never fully blocked even if the limit is reached.
28#[derive(Clone)]
29pub struct MemoryLimiter {
30 active_backoff_nanos: Arc<AtomicU64>,
31}
32
33impl MemoryLimiter {
34 /// Creates a new `MemoryLimiter` based on the given `MemoryGrant`.
35 ///
36 /// A background task is spawned that frequently checks the used memory for the entire process, and callers of
37 /// `wait_for_capacity` will observe waiting once the memory usage exceeds the configured limit threshold. The
38 /// waiting time will scale progressively the closer the memory usage is to the configured limit.
39 ///
40 /// Defaults to a 95% threshold (i.e. threshold begins at 95% of the limit), a minimum backoff duration of 1ms, and
41 /// a maximum backoff duration of 25ms. The effective limit of the grant is used as the memory limit.
42 pub fn new(grant: MemoryGrant) -> Option<Self> {
43 // Smoke test to see if we can even collect memory stats on this system.
44 Querier::default().resident_set_size()?;
45
46 let rss_limit = grant.effective_limit_bytes();
47 let backoff_threshold = 0.95;
48 let backoff_min = Duration::from_millis(1);
49 let backoff_max = Duration::from_millis(25);
50
51 let active_backoff_nanos = Arc::new(AtomicU64::new(0));
52 let active_backoff_nanos2 = Arc::clone(&active_backoff_nanos);
53
54 std::thread::Builder::new()
55 .name("memory-limiter-checker".to_string())
56 .spawn(move || {
57 check_memory_usage(
58 active_backoff_nanos2,
59 rss_limit,
60 backoff_threshold,
61 backoff_min,
62 backoff_max,
63 )
64 })
65 .unwrap();
66
67 Some(Self { active_backoff_nanos })
68 }
69
70 /// Creates a no-op `MemoryLimiter` that does not perform any limiting.
71 ///
72 /// All calls to `wait_for_capacity` will return immediately.
73 pub fn noop() -> Self {
74 Self {
75 active_backoff_nanos: Arc::new(AtomicU64::new(0)),
76 }
77 }
78
79 /// Waits a short period of time based on the available memory.
80 ///
81 /// If the used memory is not within the threshold of the configured limit, no waiting will occur. Otherwise, the
82 /// call will wait a variable amount of time depending on how close to the configured limit the process is.
83 pub async fn wait_for_capacity(&self) {
84 let active_backoff_nanos = self.active_backoff_nanos.load(Relaxed);
85 if active_backoff_nanos > 0 {
86 tokio::time::sleep(Duration::from_nanos(active_backoff_nanos)).await;
87 }
88 }
89}
90
91fn check_memory_usage(
92 active_backoff_nanos: Arc<AtomicU64>, rss_limit: usize, backoff_threshold: f64, backoff_min: Duration,
93 backoff_max: Duration,
94) {
95 debug!("Memory limiter checker started.");
96
97 let mut querier = Querier::default();
98
99 loop {
100 let actual_rss = querier
101 .resident_set_size()
102 .expect("memory statistics should be available");
103
104 let maybe_backoff_duration =
105 calculate_backoff(rss_limit, actual_rss, backoff_threshold, backoff_min, backoff_max);
106 match maybe_backoff_duration {
107 Some(backoff_dur) => {
108 active_backoff_nanos.store(backoff_dur.as_nanos() as u64, Relaxed);
109
110 debug!(rss_limit, actual_rss, current_backoff = ?backoff_dur, "Enforcing backoff due to memory pressure.");
111 gauge!("memory_limiter.current_backoff_secs").set(backoff_dur.as_secs_f64());
112 }
113 None => {
114 active_backoff_nanos.store(0, Relaxed);
115
116 gauge!("memory_limiter.current_backoff_secs").set(0.0);
117 }
118 }
119
120 std::thread::sleep(Duration::from_millis(250));
121 }
122}
123
124fn calculate_backoff(
125 rss_limit: usize, actual_rss: usize, backoff_threshold: f64, backoff_min: Duration, backoff_max: Duration,
126) -> Option<Duration> {
127 if actual_rss as f64 > rss_limit as f64 * backoff_threshold {
128 // When we're over the threshold, we want to scale our backoff range across the remainder of the threshold.
129 //
130 // For example, if our minimum and maximum backoff durations are 100ms and 1000ms, respectively, and our limit
131 // is 100MB with a threshold of 95% (0.95), then we would start backing off by 100ms when we hit 95MB, and we
132 // would back off at a maximum of 1000ms at 100% (or greater) of our limit.
133 //
134 // Between those two points, we would spread the difference of the minimum/maximum backoff duration (1000ms -
135 // 100ms => 900ms) across that 5%. Thus, we would expect that at 97.5% of the limit, we would be backing off by
136 // 550ms. (0.5 * 900ms => 450ms, 100ms + 450ms => 550ms)
137 let rss_backoff_range = rss_limit as f64 - (rss_limit as f64 * backoff_threshold);
138 let backoff_duration_range = backoff_max - backoff_min;
139 let threshold_delta = actual_rss as f64 - (rss_limit as f64 * backoff_threshold);
140 let variable_backoff_duration = backoff_duration_range.as_secs_f64() * (threshold_delta / rss_backoff_range);
141
142 let backoff = backoff_min + Duration::from_secs_f64(variable_backoff_duration);
143 if backoff > backoff_max {
144 Some(backoff_max)
145 } else {
146 Some(backoff)
147 }
148 } else {
149 None
150 }
151}