memory_accounting/
allocator.rs

1//! Global allocator implementation that allows tracking allocations on a per-group basis.
2
3// TODO: The current design does not allow for deregistering groups, which is currently fine and
4// likely will be for a while, but would be a limitation in a world where we dynamically launched
5// data pipelines and wanted to clean up removed components, and so on.
6
7use std::{
8    alloc::{GlobalAlloc, Layout},
9    cell::RefCell,
10    collections::HashMap,
11    future::Future,
12    marker::PhantomData,
13    pin::Pin,
14    ptr::NonNull,
15    sync::{
16        atomic::{AtomicUsize, Ordering::Relaxed},
17        Mutex, OnceLock,
18    },
19    task::{Context, Poll},
20};
21
22use pin_project::pin_project;
23
24const STATS_LAYOUT: Layout = Layout::new::<*const AllocationStats>();
25
26static REGISTRY: OnceLock<AllocationGroupRegistry> = OnceLock::new();
27static ROOT_GROUP: AllocationStats = AllocationStats::new();
28
29thread_local! {
30    static CURRENT_GROUP: RefCell<NonNull<AllocationStats>> = RefCell::new(NonNull::from(&ROOT_GROUP));
31}
32
33/// A global allocator that tracks allocations on a per-group basis.
34///
35/// This allocator provides the ability to track the allocations/deallocations, both in bytes and objects, for
36/// different, user-defined allocation groups.
37///
38/// ## Allocation groups
39///
40/// Allocation (and deallocations) are tracked by **allocation group**. When this allocator is used, every allocation is
41/// associated with an allocation group. Allocation groups are user-defined, except for the default "root" allocation
42/// group which acts as a catch-all for allocations when a user-defined group is not entered.
43///
44/// ## Token guard
45///
46/// When an allocation group is registered, an `AllocationGroupToken` is returned. This token can be used to "enter" the
47/// group, which attribute all allocations on the current thread to that group. Entering the group returns a drop guard
48/// that restores the previously entered allocation when it is dropped.
49///
50/// This allows for arbitrarily nested allocation groups.
51///
52/// ## Changes to memory layout
53///
54/// In order to associate an allocation with the current allocation group, a small trailer is added to the requested
55/// allocation layout, in the form of a pointer to the statistics for the allocation group. This allows updating the
56/// statistics directly when an allocation is deallocated, without having to externally keep track of what group a given
57/// allocation belongs to. These statistics are updated directly when the allocation is initially made, and when it is
58/// deallocated.
59///
60/// This means that all requested allocations end up being one machine word larger: 4 bytes on 32-bit systems, and 8
61/// bytes on 64-bit systems.
62pub struct TrackingAllocator<A> {
63    allocator: A,
64}
65
66impl<A> TrackingAllocator<A> {
67    /// Creates a new `TrackingAllocator` that wraps another allocator.
68    ///
69    /// The wrapped allocator is used to actually allocate and deallocate memory, while this allocator is responsible
70    /// purely for tracking the allocations and deallocations themselves.
71    pub const fn new(allocator: A) -> Self {
72        Self { allocator }
73    }
74}
75
76unsafe impl<A> GlobalAlloc for TrackingAllocator<A>
77where
78    A: GlobalAlloc,
79{
80    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
81        // Adjust the requested layout to fit our trailer and then try and allocate it.
82        let (layout, trailer_start) = get_layout_with_group_trailer(layout);
83        let layout_size = layout.size();
84        let ptr = self.allocator.alloc(layout);
85        if ptr.is_null() {
86            return ptr;
87        }
88
89        // Store the pointer to the current allocation group in the trailer, and also update the statistics.
90        let trailer_ptr = ptr.add(trailer_start) as *mut *mut AllocationStats;
91        CURRENT_GROUP.with(|current_group| {
92            let group_ptr = current_group.borrow();
93            group_ptr.as_ref().track_allocation(layout_size);
94
95            trailer_ptr.write(group_ptr.as_ptr());
96        });
97
98        ptr
99    }
100
101    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
102        // Read the pointer to the owning allocation group from the trailer and update the statistics.
103        let (layout, trailer_start) = get_layout_with_group_trailer(layout);
104        let trailer_ptr = ptr.add(trailer_start) as *mut *mut AllocationStats;
105        let group = (*trailer_ptr).as_ref().unwrap();
106        group.track_deallocation(layout.size());
107
108        // Deallocate the memory.
109        self.allocator.dealloc(ptr, layout);
110    }
111}
112
113fn get_layout_with_group_trailer(layout: Layout) -> (Layout, usize) {
114    let (new_layout, trailer_start) = layout.extend(STATS_LAYOUT).unwrap();
115    (new_layout.pad_to_align(), trailer_start)
116}
117
118/// Statistics for an allocation group.
119pub struct AllocationStats {
120    allocated_bytes: AtomicUsize,
121    allocated_objects: AtomicUsize,
122    deallocated_bytes: AtomicUsize,
123    deallocated_objects: AtomicUsize,
124}
125
126impl AllocationStats {
127    const fn new() -> Self {
128        Self {
129            allocated_bytes: AtomicUsize::new(0),
130            allocated_objects: AtomicUsize::new(0),
131            deallocated_bytes: AtomicUsize::new(0),
132            deallocated_objects: AtomicUsize::new(0),
133        }
134    }
135
136    /// Gets a reference to the statistics of the root allocation group.
137    fn root() -> &'static Self {
138        &ROOT_GROUP
139    }
140
141    /// Returns `true` if the given group has allocated any memory at all.
142    pub fn has_allocated(&self) -> bool {
143        self.allocated_bytes.load(Relaxed) > 0
144    }
145
146    #[inline]
147    fn track_allocation(&self, size: usize) {
148        self.allocated_bytes.fetch_add(size, Relaxed);
149        self.allocated_objects.fetch_add(1, Relaxed);
150    }
151
152    #[inline]
153    fn track_deallocation(&self, size: usize) {
154        self.deallocated_bytes.fetch_add(size, Relaxed);
155        self.deallocated_objects.fetch_add(1, Relaxed);
156    }
157
158    /// Captures a snapshot of the current statistics based on the delta from a previous snapshot.
159    ///
160    /// This can be used to keep a single local snapshot of the last delta, and then both track the delta since that
161    /// snapshot, as well as update the snapshot to the current statistics.
162    ///
163    /// Callers should generally create their snapshot via [`AllocationStatsSnapshot::empty`] and then use this method
164    /// to get their snapshot delta, utilize those delta values in whatever way is necessary, and then merge the
165    /// snapshot delta into the primary snapshot via [`AllocationStatsSnapshot::merge`] to make it current.
166    pub fn snapshot_delta(&self, previous: &AllocationStatsSnapshot) -> AllocationStatsSnapshot {
167        AllocationStatsSnapshot {
168            allocated_bytes: self.allocated_bytes.load(Relaxed) - previous.allocated_bytes,
169            allocated_objects: self.allocated_objects.load(Relaxed) - previous.allocated_objects,
170            deallocated_bytes: self.deallocated_bytes.load(Relaxed) - previous.deallocated_bytes,
171            deallocated_objects: self.deallocated_objects.load(Relaxed) - previous.deallocated_objects,
172        }
173    }
174}
175
176/// Snapshot of allocation statistics for a group.
177pub struct AllocationStatsSnapshot {
178    /// Number of allocated bytes since the last snapshot.
179    pub allocated_bytes: usize,
180
181    /// Number of allocated objects since the last snapshot.
182    pub allocated_objects: usize,
183
184    /// Number of deallocated bytes since the last snapshot.
185    pub deallocated_bytes: usize,
186
187    /// Number of deallocated objects since the last snapshot.
188    pub deallocated_objects: usize,
189}
190
191impl AllocationStatsSnapshot {
192    /// Creates an empty `AllocationStatsSnapshot`.
193    pub const fn empty() -> Self {
194        Self {
195            allocated_bytes: 0,
196            allocated_objects: 0,
197            deallocated_bytes: 0,
198            deallocated_objects: 0,
199        }
200    }
201
202    /// Returns the number of live allocated bytes.
203    pub fn live_bytes(&self) -> usize {
204        self.allocated_bytes - self.deallocated_bytes
205    }
206
207    /// Returns the number of live allocated objects.
208    pub fn live_objects(&self) -> usize {
209        self.allocated_objects - self.deallocated_objects
210    }
211
212    /// Merges `other` into `self`.
213    ///
214    /// This can be used to accumulate the total number of (de)allocated bytes and objects when handling the deltas
215    /// generated from `AllocationStats::consume`.
216    pub fn merge(&mut self, other: &Self) {
217        self.allocated_bytes += other.allocated_bytes;
218        self.allocated_objects += other.allocated_objects;
219        self.deallocated_bytes += other.deallocated_bytes;
220        self.deallocated_objects += other.deallocated_objects;
221    }
222}
223
224/// A token associated with a specific allocation group.
225///
226/// Used to attribute allocations and deallocations to a specific group with a scope guard [`TrackingGuard`], or
227/// through helpers provided by the [`Track`] trait.
228#[derive(Clone, Copy)]
229pub struct AllocationGroupToken {
230    group_ptr: NonNull<AllocationStats>,
231}
232
233impl AllocationGroupToken {
234    fn new(group_ptr: NonNull<AllocationStats>) -> Self {
235        Self { group_ptr }
236    }
237
238    /// Returns an `AllocationGroupToken` for the current allocation group.
239    pub fn current() -> Self {
240        CURRENT_GROUP.with(|current_group| {
241            let group_ptr = current_group.borrow();
242            Self::new(*group_ptr)
243        })
244    }
245
246    #[cfg(test)]
247    fn ptr_eq(&self, other: &Self) -> bool {
248        self.group_ptr == other.group_ptr
249    }
250
251    /// Returns the token for the root allocation group.
252    pub(crate) fn root() -> Self {
253        Self::new(NonNull::from(&ROOT_GROUP))
254    }
255
256    /// Enters this allocation group, returning a guard that will exit the allocation group when dropped.
257    pub fn enter(&self) -> TrackingGuard<'_> {
258        // Swap the current group to the one we're tracking.
259        CURRENT_GROUP.with(|current_group| {
260            let mut group_ptr = current_group.borrow_mut();
261            let previous_group_ptr = *group_ptr;
262            *group_ptr = self.group_ptr;
263
264            TrackingGuard {
265                previous_group_ptr,
266                _token: PhantomData,
267            }
268        })
269    }
270}
271
272// SAFETY: There's nothing inherently thread-specific about the token.
273unsafe impl Send for AllocationGroupToken {}
274
275// SAFETY: There's nothing unsafe about sharing the token between threads, as it's safe to enter the same token on
276// multiple threads at the same time, and the token itself has no internal state or interior mutability.
277unsafe impl Sync for AllocationGroupToken {}
278
279/// A guard representing an allocation group which has been entered.
280///
281/// When the guard is dropped, the allocation group will be exited and the previously entered
282/// allocation group will be restored.
283///
284/// This is returned by the [`AllocationGroupToken::enter`] method.
285pub struct TrackingGuard<'a> {
286    previous_group_ptr: NonNull<AllocationStats>,
287    _token: PhantomData<&'a AllocationGroupToken>,
288}
289
290impl Drop for TrackingGuard<'_> {
291    fn drop(&mut self) {
292        // Reset the current group to the one that existed before we entered.
293        CURRENT_GROUP.with(|current_group| {
294            let mut group_ptr = current_group.borrow_mut();
295            *group_ptr = self.previous_group_ptr;
296        });
297    }
298}
299
300/// An object wrapper that tracks allocations and attributes them to a specific group.
301///
302/// Provides methods and implementations to help ensure that operations against/using the wrapped object have all
303/// allocations properly tracked and attributed to a given group.
304///
305/// Implements [`Future`] when the wrapped object itself implements [`Future`].
306//
307// TODO: A more complete example of this sort of thing is `tracing::Instrumented`, where they also have some fancy code
308// to trace execution even in the drop logic of the wrapped future. I'm not sure we need that here, because we don't
309// care about what components an object is deallocated in, and I don't think we expect to have any futures where the
310// drop logic actually _allocates_, and certainly not in a way where we want to attribute it to that future's attached
311// component... but for posterity, I'm mentioning it here since we _might_ consider doing it. Might.
312#[pin_project]
313#[must_use = "futures do nothing unless you `.await` or poll them"]
314pub struct Tracked<Inner> {
315    token: AllocationGroupToken,
316
317    #[pin]
318    inner: Inner,
319}
320
321impl<Inner> Tracked<Inner> {
322    /// Consumes this object and returns the inner object and tracking token.
323    pub fn into_parts(self) -> (AllocationGroupToken, Inner) {
324        (self.token, self.inner)
325    }
326}
327
328impl<Inner> Future for Tracked<Inner>
329where
330    Inner: Future,
331{
332    type Output = Inner::Output;
333
334    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
335        let this = self.project();
336        let _enter = this.token.enter();
337        this.inner.poll(cx)
338    }
339}
340
341/// Attaches allocation groups to a [`Future`].
342pub trait Track: Sized {
343    /// Instruments this type by attaching the given allocation group token, returning a `Tracked` wrapper.
344    ///
345    /// The allocation group will be entered every time the wrapped future is polled.
346    ///
347    /// # Examples
348    ///
349    /// ```rust
350    /// use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken, Track as _};
351    ///
352    /// # async fn doc() {
353    /// let future = async {
354    ///     // All allocations in this future will be attached to the allocation group
355    ///     // represented by `token`...
356    /// };
357    ///
358    /// let token = AllocationGroupRegistry::global().register_allocation_group("my-group");
359    /// future
360    ///     .track_allocations(token)
361    ///     .await
362    /// # }
363    fn track_allocations(self, token: AllocationGroupToken) -> Tracked<Self> {
364        Tracked { token, inner: self }
365    }
366
367    /// Instruments this type by attaching the current allocation group, returning a `Tracked` wrapper.
368    ///
369    /// The allocation group will be entered every time the wrapped future is polled.
370    ///
371    /// This can be used to propagate the current allocation group when spawning a new future.
372    ///
373    /// # Examples
374    ///
375    /// ```rust
376    /// use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken, Track as _};
377    ///
378    /// # mod tokio {
379    /// #     pub(super) fn spawn(_: impl std::future::Future) {}
380    /// # }
381    /// # async fn doc() {
382    /// let token = AllocationGroupRegistry::global().register_allocation_group("my-group");
383    /// let _enter = token.enter();
384    ///
385    /// // ...
386    ///
387    /// let future = async {
388    ///     // All allocations in this future will be attached to the allocation group
389    ///     // represented by `token`...
390    /// };
391    /// tokio::spawn(future.in_current_allocation_group());
392    /// # }
393    /// ```
394    fn in_current_allocation_group(self) -> Tracked<Self> {
395        Tracked {
396            token: AllocationGroupToken::current(),
397            inner: self,
398        }
399    }
400}
401
402impl<T: Sized> Track for T {}
403
404/// A registry of allocation groups and the statistics for each of them.
405pub struct AllocationGroupRegistry {
406    allocation_groups: Mutex<HashMap<String, Box<AllocationStats>>>,
407}
408
409impl AllocationGroupRegistry {
410    fn new() -> Self {
411        in_root_allocation_group(|| Self {
412            allocation_groups: Mutex::new(HashMap::with_capacity(4)),
413        })
414    }
415
416    /// Gets a reference to the global allocation group registry.
417    pub fn global() -> &'static Self {
418        REGISTRY.get_or_init(Self::new)
419    }
420
421    /// Returns `true` if `TrackingAllocator` is installed as the global allocator.
422    pub fn allocator_installed() -> bool {
423        // Essentially, when we load the group registry, and it gets created for the first time, it will specifically
424        // allocate its internal data structures while entered into the root allocation group.
425        //
426        // This means that if the allocator is installed, we should always have some allocations in the root group by
427        // the time we call `AllocationStats::has_allocated`.
428        AllocationStats::root().has_allocated()
429    }
430
431    /// Registers a new allocation group with the given name.
432    ///
433    /// Returns an `AllocationGroupToken` that can be used to attribute allocations and deallocations to the
434    /// newly-created allocation group.
435    pub fn register_allocation_group<S>(&self, name: S) -> AllocationGroupToken
436    where
437        S: AsRef<str>,
438    {
439        in_root_allocation_group(|| {
440            let mut allocation_groups = self.allocation_groups.lock().unwrap();
441            match allocation_groups.get(name.as_ref()) {
442                Some(stats) => AllocationGroupToken::new(NonNull::from(&**stats)),
443                None => {
444                    let allocation_group_stats = Box::new(AllocationStats::new());
445                    let token = AllocationGroupToken::new(NonNull::from(&*allocation_group_stats));
446
447                    allocation_groups.insert(name.as_ref().to_string(), allocation_group_stats);
448
449                    token
450                }
451            }
452        })
453    }
454
455    /// Visits all allocation groups in the registry and calls the given closure with their names and statistics.
456    pub fn visit_allocation_groups<F>(&self, mut f: F)
457    where
458        F: FnMut(&str, &AllocationStats),
459    {
460        in_root_allocation_group(|| {
461            f("root", &ROOT_GROUP);
462
463            let allocation_groups = self.allocation_groups.lock().unwrap();
464            for (name, stats) in allocation_groups.iter() {
465                f(name, stats);
466            }
467        });
468    }
469}
470
471fn in_root_allocation_group<F, R>(f: F) -> R
472where
473    F: FnOnce() -> R,
474{
475    let token = AllocationGroupToken::root();
476    let _enter = token.enter();
477    f()
478}
479
480#[cfg(test)]
481mod tests {
482    use super::AllocationGroupRegistry;
483
484    #[test]
485    fn existing_group() {
486        let registry = AllocationGroupRegistry::new();
487        let token = registry.register_allocation_group("test");
488        let token2 = registry.register_allocation_group("test");
489        let token3 = registry.register_allocation_group("test2");
490
491        assert!(token.ptr_eq(&token2));
492        assert!(!token.ptr_eq(&token3));
493    }
494
495    #[test]
496    fn visit_allocation_groups() {
497        let registry = AllocationGroupRegistry::new();
498        let _token = registry.register_allocation_group("my-group");
499
500        let mut visited = Vec::new();
501        registry.visit_allocation_groups(|name, _stats| {
502            visited.push(name.to_string());
503        });
504
505        assert_eq!(visited.len(), 2);
506        assert_eq!(visited[0], "root");
507        assert_eq!(visited[1], "my-group");
508    }
509}