memory_accounting/allocator.rs
1//! Global allocator implementation that allows tracking allocations on a per-group basis.
2
3// TODO: The current design does not allow for deregistering groups, which is currently fine and
4// likely will be for a while, but would be a limitation in a world where we dynamically launched
5// data pipelines and wanted to clean up removed components, and so on.
6
7use std::{
8 alloc::{GlobalAlloc, Layout},
9 cell::RefCell,
10 collections::HashMap,
11 future::Future,
12 marker::PhantomData,
13 pin::Pin,
14 ptr::NonNull,
15 sync::{
16 atomic::{AtomicUsize, Ordering::Relaxed},
17 Mutex, OnceLock,
18 },
19 task::{Context, Poll},
20};
21
22use pin_project::pin_project;
23
24const STATS_LAYOUT: Layout = Layout::new::<*const AllocationStats>();
25
26static REGISTRY: OnceLock<AllocationGroupRegistry> = OnceLock::new();
27static ROOT_GROUP: AllocationStats = AllocationStats::new();
28
29thread_local! {
30 static CURRENT_GROUP: RefCell<NonNull<AllocationStats>> = RefCell::new(NonNull::from(&ROOT_GROUP));
31}
32
33/// A global allocator that tracks allocations on a per-group basis.
34///
35/// This allocator provides the ability to track the allocations/deallocations, both in bytes and objects, for
36/// different, user-defined allocation groups.
37///
38/// ## Allocation groups
39///
40/// Allocation (and deallocations) are tracked by **allocation group**. When this allocator is used, every allocation is
41/// associated with an allocation group. Allocation groups are user-defined, except for the default "root" allocation
42/// group which acts as a catch-all for allocations when a user-defined group is not entered.
43///
44/// ## Token guard
45///
46/// When an allocation group is registered, an `AllocationGroupToken` is returned. This token can be used to "enter" the
47/// group, which attribute all allocations on the current thread to that group. Entering the group returns a drop guard
48/// that restores the previously entered allocation when it is dropped.
49///
50/// This allows for arbitrarily nested allocation groups.
51///
52/// ## Changes to memory layout
53///
54/// In order to associate an allocation with the current allocation group, a small trailer is added to the requested
55/// allocation layout, in the form of a pointer to the statistics for the allocation group. This allows updating the
56/// statistics directly when an allocation is deallocated, without having to externally keep track of what group a given
57/// allocation belongs to. These statistics are updated directly when the allocation is initially made, and when it is
58/// deallocated.
59///
60/// This means that all requested allocations end up being one machine word larger: 4 bytes on 32-bit systems, and 8
61/// bytes on 64-bit systems.
62pub struct TrackingAllocator<A> {
63 allocator: A,
64}
65
66impl<A> TrackingAllocator<A> {
67 /// Creates a new `TrackingAllocator` that wraps another allocator.
68 ///
69 /// The wrapped allocator is used to actually allocate and deallocate memory, while this allocator is responsible
70 /// purely for tracking the allocations and deallocations themselves.
71 pub const fn new(allocator: A) -> Self {
72 Self { allocator }
73 }
74}
75
76unsafe impl<A> GlobalAlloc for TrackingAllocator<A>
77where
78 A: GlobalAlloc,
79{
80 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
81 // Adjust the requested layout to fit our trailer and then try and allocate it.
82 let (layout, trailer_start) = get_layout_with_group_trailer(layout);
83 let layout_size = layout.size();
84 let ptr = self.allocator.alloc(layout);
85 if ptr.is_null() {
86 return ptr;
87 }
88
89 // Store the pointer to the current allocation group in the trailer, and also update the statistics.
90 let trailer_ptr = ptr.add(trailer_start) as *mut *mut AllocationStats;
91 CURRENT_GROUP.with(|current_group| {
92 let group_ptr = current_group.borrow();
93 group_ptr.as_ref().track_allocation(layout_size);
94
95 trailer_ptr.write(group_ptr.as_ptr());
96 });
97
98 ptr
99 }
100
101 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
102 // Read the pointer to the owning allocation group from the trailer and update the statistics.
103 let (layout, trailer_start) = get_layout_with_group_trailer(layout);
104 let trailer_ptr = ptr.add(trailer_start) as *mut *mut AllocationStats;
105 let group = (*trailer_ptr).as_ref().unwrap();
106 group.track_deallocation(layout.size());
107
108 // Deallocate the memory.
109 self.allocator.dealloc(ptr, layout);
110 }
111}
112
113fn get_layout_with_group_trailer(layout: Layout) -> (Layout, usize) {
114 let (new_layout, trailer_start) = layout.extend(STATS_LAYOUT).unwrap();
115 (new_layout.pad_to_align(), trailer_start)
116}
117
118/// Statistics for an allocation group.
119pub struct AllocationStats {
120 allocated_bytes: AtomicUsize,
121 allocated_objects: AtomicUsize,
122 deallocated_bytes: AtomicUsize,
123 deallocated_objects: AtomicUsize,
124}
125
126impl AllocationStats {
127 const fn new() -> Self {
128 Self {
129 allocated_bytes: AtomicUsize::new(0),
130 allocated_objects: AtomicUsize::new(0),
131 deallocated_bytes: AtomicUsize::new(0),
132 deallocated_objects: AtomicUsize::new(0),
133 }
134 }
135
136 /// Gets a reference to the statistics of the root allocation group.
137 fn root() -> &'static Self {
138 &ROOT_GROUP
139 }
140
141 /// Returns `true` if the given group has allocated any memory at all.
142 pub fn has_allocated(&self) -> bool {
143 self.allocated_bytes.load(Relaxed) > 0
144 }
145
146 #[inline]
147 fn track_allocation(&self, size: usize) {
148 self.allocated_bytes.fetch_add(size, Relaxed);
149 self.allocated_objects.fetch_add(1, Relaxed);
150 }
151
152 #[inline]
153 fn track_deallocation(&self, size: usize) {
154 self.deallocated_bytes.fetch_add(size, Relaxed);
155 self.deallocated_objects.fetch_add(1, Relaxed);
156 }
157
158 /// Captures a snapshot of the current statistics based on the delta from a previous snapshot.
159 ///
160 /// This can be used to keep a single local snapshot of the last delta, and then both track the delta since that
161 /// snapshot, as well as update the snapshot to the current statistics.
162 ///
163 /// Callers should generally create their snapshot via [`AllocationStatsSnapshot::empty`] and then use this method
164 /// to get their snapshot delta, utilize those delta values in whatever way is necessary, and then merge the
165 /// snapshot delta into the primary snapshot via [`AllocationStatsSnapshot::merge`] to make it current.
166 pub fn snapshot_delta(&self, previous: &AllocationStatsSnapshot) -> AllocationStatsSnapshot {
167 AllocationStatsSnapshot {
168 allocated_bytes: self.allocated_bytes.load(Relaxed) - previous.allocated_bytes,
169 allocated_objects: self.allocated_objects.load(Relaxed) - previous.allocated_objects,
170 deallocated_bytes: self.deallocated_bytes.load(Relaxed) - previous.deallocated_bytes,
171 deallocated_objects: self.deallocated_objects.load(Relaxed) - previous.deallocated_objects,
172 }
173 }
174}
175
176/// Snapshot of allocation statistics for a group.
177pub struct AllocationStatsSnapshot {
178 /// Number of allocated bytes since the last snapshot.
179 pub allocated_bytes: usize,
180
181 /// Number of allocated objects since the last snapshot.
182 pub allocated_objects: usize,
183
184 /// Number of deallocated bytes since the last snapshot.
185 pub deallocated_bytes: usize,
186
187 /// Number of deallocated objects since the last snapshot.
188 pub deallocated_objects: usize,
189}
190
191impl AllocationStatsSnapshot {
192 /// Creates an empty `AllocationStatsSnapshot`.
193 pub const fn empty() -> Self {
194 Self {
195 allocated_bytes: 0,
196 allocated_objects: 0,
197 deallocated_bytes: 0,
198 deallocated_objects: 0,
199 }
200 }
201
202 /// Returns the number of live allocated bytes.
203 pub fn live_bytes(&self) -> usize {
204 self.allocated_bytes - self.deallocated_bytes
205 }
206
207 /// Returns the number of live allocated objects.
208 pub fn live_objects(&self) -> usize {
209 self.allocated_objects - self.deallocated_objects
210 }
211
212 /// Merges `other` into `self`.
213 ///
214 /// This can be used to accumulate the total number of (de)allocated bytes and objects when handling the deltas
215 /// generated from `AllocationStats::consume`.
216 pub fn merge(&mut self, other: &Self) {
217 self.allocated_bytes += other.allocated_bytes;
218 self.allocated_objects += other.allocated_objects;
219 self.deallocated_bytes += other.deallocated_bytes;
220 self.deallocated_objects += other.deallocated_objects;
221 }
222}
223
224/// A token associated with a specific allocation group.
225///
226/// Used to attribute allocations and deallocations to a specific group with a scope guard [`TrackingGuard`], or
227/// through helpers provided by the [`Track`] trait.
228#[derive(Clone, Copy)]
229pub struct AllocationGroupToken {
230 group_ptr: NonNull<AllocationStats>,
231}
232
233impl AllocationGroupToken {
234 fn new(group_ptr: NonNull<AllocationStats>) -> Self {
235 Self { group_ptr }
236 }
237
238 /// Returns an `AllocationGroupToken` for the current allocation group.
239 pub fn current() -> Self {
240 CURRENT_GROUP.with(|current_group| {
241 let group_ptr = current_group.borrow();
242 Self::new(*group_ptr)
243 })
244 }
245
246 #[cfg(test)]
247 fn ptr_eq(&self, other: &Self) -> bool {
248 self.group_ptr == other.group_ptr
249 }
250
251 /// Returns the token for the root allocation group.
252 pub(crate) fn root() -> Self {
253 Self::new(NonNull::from(&ROOT_GROUP))
254 }
255
256 /// Enters this allocation group, returning a guard that will exit the allocation group when dropped.
257 pub fn enter(&self) -> TrackingGuard<'_> {
258 // Swap the current group to the one we're tracking.
259 CURRENT_GROUP.with(|current_group| {
260 let mut group_ptr = current_group.borrow_mut();
261 let previous_group_ptr = *group_ptr;
262 *group_ptr = self.group_ptr;
263
264 TrackingGuard {
265 previous_group_ptr,
266 _token: PhantomData,
267 }
268 })
269 }
270}
271
272// SAFETY: There's nothing inherently thread-specific about the token.
273unsafe impl Send for AllocationGroupToken {}
274
275// SAFETY: There's nothing unsafe about sharing the token between threads, as it's safe to enter the same token on
276// multiple threads at the same time, and the token itself has no internal state or interior mutability.
277unsafe impl Sync for AllocationGroupToken {}
278
279/// A guard representing an allocation group which has been entered.
280///
281/// When the guard is dropped, the allocation group will be exited and the previously entered
282/// allocation group will be restored.
283///
284/// This is returned by the [`AllocationGroupToken::enter`] method.
285pub struct TrackingGuard<'a> {
286 previous_group_ptr: NonNull<AllocationStats>,
287 _token: PhantomData<&'a AllocationGroupToken>,
288}
289
290impl Drop for TrackingGuard<'_> {
291 fn drop(&mut self) {
292 // Reset the current group to the one that existed before we entered.
293 CURRENT_GROUP.with(|current_group| {
294 let mut group_ptr = current_group.borrow_mut();
295 *group_ptr = self.previous_group_ptr;
296 });
297 }
298}
299
300/// An object wrapper that tracks allocations and attributes them to a specific group.
301///
302/// Provides methods and implementations to help ensure that operations against/using the wrapped object have all
303/// allocations properly tracked and attributed to a given group.
304///
305/// Implements [`Future`] when the wrapped object itself implements [`Future`].
306//
307// TODO: A more complete example of this sort of thing is `tracing::Instrumented`, where they also have some fancy code
308// to trace execution even in the drop logic of the wrapped future. I'm not sure we need that here, because we don't
309// care about what components an object is deallocated in, and I don't think we expect to have any futures where the
310// drop logic actually _allocates_, and certainly not in a way where we want to attribute it to that future's attached
311// component... but for posterity, I'm mentioning it here since we _might_ consider doing it. Might.
312#[pin_project]
313#[must_use = "futures do nothing unless you `.await` or poll them"]
314pub struct Tracked<Inner> {
315 token: AllocationGroupToken,
316
317 #[pin]
318 inner: Inner,
319}
320
321impl<Inner> Tracked<Inner> {
322 /// Consumes this object and returns the inner object and tracking token.
323 pub fn into_parts(self) -> (AllocationGroupToken, Inner) {
324 (self.token, self.inner)
325 }
326}
327
328impl<Inner> Future for Tracked<Inner>
329where
330 Inner: Future,
331{
332 type Output = Inner::Output;
333
334 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
335 let this = self.project();
336 let _enter = this.token.enter();
337 this.inner.poll(cx)
338 }
339}
340
341/// Attaches allocation groups to a [`Future`].
342pub trait Track: Sized {
343 /// Instruments this type by attaching the given allocation group token, returning a `Tracked` wrapper.
344 ///
345 /// The allocation group will be entered every time the wrapped future is polled.
346 ///
347 /// # Examples
348 ///
349 /// ```rust
350 /// use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken, Track as _};
351 ///
352 /// # async fn doc() {
353 /// let future = async {
354 /// // All allocations in this future will be attached to the allocation group
355 /// // represented by `token`...
356 /// };
357 ///
358 /// let token = AllocationGroupRegistry::global().register_allocation_group("my-group");
359 /// future
360 /// .track_allocations(token)
361 /// .await
362 /// # }
363 fn track_allocations(self, token: AllocationGroupToken) -> Tracked<Self> {
364 Tracked { token, inner: self }
365 }
366
367 /// Instruments this type by attaching the current allocation group, returning a `Tracked` wrapper.
368 ///
369 /// The allocation group will be entered every time the wrapped future is polled.
370 ///
371 /// This can be used to propagate the current allocation group when spawning a new future.
372 ///
373 /// # Examples
374 ///
375 /// ```rust
376 /// use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken, Track as _};
377 ///
378 /// # mod tokio {
379 /// # pub(super) fn spawn(_: impl std::future::Future) {}
380 /// # }
381 /// # async fn doc() {
382 /// let token = AllocationGroupRegistry::global().register_allocation_group("my-group");
383 /// let _enter = token.enter();
384 ///
385 /// // ...
386 ///
387 /// let future = async {
388 /// // All allocations in this future will be attached to the allocation group
389 /// // represented by `token`...
390 /// };
391 /// tokio::spawn(future.in_current_allocation_group());
392 /// # }
393 /// ```
394 fn in_current_allocation_group(self) -> Tracked<Self> {
395 Tracked {
396 token: AllocationGroupToken::current(),
397 inner: self,
398 }
399 }
400}
401
402impl<T: Sized> Track for T {}
403
404/// A registry of allocation groups and the statistics for each of them.
405pub struct AllocationGroupRegistry {
406 allocation_groups: Mutex<HashMap<String, Box<AllocationStats>>>,
407}
408
409impl AllocationGroupRegistry {
410 fn new() -> Self {
411 in_root_allocation_group(|| Self {
412 allocation_groups: Mutex::new(HashMap::with_capacity(4)),
413 })
414 }
415
416 /// Gets a reference to the global allocation group registry.
417 pub fn global() -> &'static Self {
418 REGISTRY.get_or_init(Self::new)
419 }
420
421 /// Returns `true` if `TrackingAllocator` is installed as the global allocator.
422 pub fn allocator_installed() -> bool {
423 // Essentially, when we load the group registry, and it gets created for the first time, it will specifically
424 // allocate its internal data structures while entered into the root allocation group.
425 //
426 // This means that if the allocator is installed, we should always have some allocations in the root group by
427 // the time we call `AllocationStats::has_allocated`.
428 AllocationStats::root().has_allocated()
429 }
430
431 /// Registers a new allocation group with the given name.
432 ///
433 /// Returns an `AllocationGroupToken` that can be used to attribute allocations and deallocations to the
434 /// newly-created allocation group.
435 pub fn register_allocation_group<S>(&self, name: S) -> AllocationGroupToken
436 where
437 S: AsRef<str>,
438 {
439 in_root_allocation_group(|| {
440 let mut allocation_groups = self.allocation_groups.lock().unwrap();
441 match allocation_groups.get(name.as_ref()) {
442 Some(stats) => AllocationGroupToken::new(NonNull::from(&**stats)),
443 None => {
444 let allocation_group_stats = Box::new(AllocationStats::new());
445 let token = AllocationGroupToken::new(NonNull::from(&*allocation_group_stats));
446
447 allocation_groups.insert(name.as_ref().to_string(), allocation_group_stats);
448
449 token
450 }
451 }
452 })
453 }
454
455 /// Visits all allocation groups in the registry and calls the given closure with their names and statistics.
456 pub fn visit_allocation_groups<F>(&self, mut f: F)
457 where
458 F: FnMut(&str, &AllocationStats),
459 {
460 in_root_allocation_group(|| {
461 f("root", &ROOT_GROUP);
462
463 let allocation_groups = self.allocation_groups.lock().unwrap();
464 for (name, stats) in allocation_groups.iter() {
465 f(name, stats);
466 }
467 });
468 }
469}
470
471fn in_root_allocation_group<F, R>(f: F) -> R
472where
473 F: FnOnce() -> R,
474{
475 let token = AllocationGroupToken::root();
476 let _enter = token.enter();
477 f()
478}
479
480#[cfg(test)]
481mod tests {
482 use super::AllocationGroupRegistry;
483
484 #[test]
485 fn existing_group() {
486 let registry = AllocationGroupRegistry::new();
487 let token = registry.register_allocation_group("test");
488 let token2 = registry.register_allocation_group("test");
489 let token3 = registry.register_allocation_group("test2");
490
491 assert!(token.ptr_eq(&token2));
492 assert!(!token.ptr_eq(&token3));
493 }
494
495 #[test]
496 fn visit_allocation_groups() {
497 let registry = AllocationGroupRegistry::new();
498 let _token = registry.register_allocation_group("my-group");
499
500 let mut visited = Vec::new();
501 registry.visit_allocation_groups(|name, _stats| {
502 visited.push(name.to_string());
503 });
504
505 assert_eq!(visited.len(), 2);
506 assert_eq!(visited[0], "root");
507 assert_eq!(visited[1], "my-group");
508 }
509}