Skip to main content

resource_accounting/
groups.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    future::Future,
5    marker::PhantomData,
6    pin::Pin,
7    ptr::NonNull,
8    sync::{Mutex, OnceLock},
9    task::{Context, Poll},
10};
11
12use pin_project::pin_project;
13
14use super::stats::thread_cpu_time_nanos;
15use crate::ResourceStats;
16
17static REGISTRY: OnceLock<ResourceGroupRegistry> = OnceLock::new();
18static ROOT_GROUP: ResourceStats = ResourceStats::new();
19
20thread_local! {
21    pub(super) static CURRENT_GROUP: RefCell<NonNull<ResourceStats>> = RefCell::new(NonNull::from(&ROOT_GROUP));
22}
23
24/// A token associated with a specific resource group.
25///
26/// Used to attribute allocations and deallocations to a specific group with a scope guard [`ResourceTrackingGuard`], or
27/// through helpers provided by the [`Track`] trait.
28#[derive(Clone, Copy)]
29pub struct ResourceGroupToken {
30    group_ptr: NonNull<ResourceStats>,
31}
32
33impl ResourceGroupToken {
34    fn new(group_ptr: NonNull<ResourceStats>) -> Self {
35        Self { group_ptr }
36    }
37
38    /// Returns an `ResourceGroupToken` for the current resource group.
39    pub fn current() -> Self {
40        CURRENT_GROUP.with(|current_group| {
41            let group_ptr = current_group.borrow();
42            Self::new(*group_ptr)
43        })
44    }
45
46    #[cfg(test)]
47    fn ptr_eq(&self, other: &Self) -> bool {
48        self.group_ptr == other.group_ptr
49    }
50
51    /// Returns the token for the root resource group.
52    pub(crate) fn root() -> Self {
53        Self::new(NonNull::from(&ROOT_GROUP))
54    }
55
56    /// Enters this resource group, returning a guard that will exit the resource group when dropped.
57    pub fn enter(&self) -> ResourceTrackingGuard<'_> {
58        // Track our starting point for this thread's CPU usage.
59        let thread_cpu_usage_start = thread_cpu_time_nanos().unwrap_or(0);
60
61        // Swap the current group to the one we're tracking.
62        CURRENT_GROUP.with(|current_group| {
63            let mut group_ptr = current_group.borrow_mut();
64            let previous_group_ptr = *group_ptr;
65            *group_ptr = self.group_ptr;
66
67            ResourceTrackingGuard {
68                previous_group_ptr,
69                thread_cpu_usage_start,
70                _token: PhantomData,
71            }
72        })
73    }
74}
75
76// SAFETY: There's nothing inherently thread-specific about the token.
77unsafe impl Send for ResourceGroupToken {}
78
79// SAFETY: There's nothing unsafe about sharing the token between threads, as it's safe to enter the same token on
80// multiple threads at the same time, and the token itself has no internal state or interior mutability.
81unsafe impl Sync for ResourceGroupToken {}
82
83/// A guard representing an resource group which has been entered.
84///
85/// When the guard is dropped, the resource group will be exited and the previously entered resource group will be
86/// restored.
87///
88/// This is returned by the [`ResourceGroupToken::enter`] method.
89pub struct ResourceTrackingGuard<'a> {
90    previous_group_ptr: NonNull<ResourceStats>,
91    thread_cpu_usage_start: u64,
92    _token: PhantomData<&'a ResourceGroupToken>,
93}
94
95impl Drop for ResourceTrackingGuard<'_> {
96    fn drop(&mut self) {
97        // Grab our current total CPU usage for the thread, and calculate the delta.
98        let thread_cpu_usage_end = thread_cpu_time_nanos().unwrap_or(0);
99        let cpu_usage_delta = thread_cpu_usage_end.saturating_sub(self.thread_cpu_usage_start);
100
101        // Reset the current group to the one that existed before we entered.
102        CURRENT_GROUP.with(|current_group| {
103            let mut group_ptr = current_group.borrow_mut();
104
105            // Now track the delta in CPU usage, if available, before resetting the group.
106            if cpu_usage_delta != 0 {
107                // SAFETY: We only construct the pointer to `ResourceStats` from a leaked heap allocation, and we never
108                // deallocate it, so it's always non-null/aligned/valid-for-`T`, etc.
109                unsafe { group_ptr.as_ref().track_cpu_time(cpu_usage_delta) }
110            }
111
112            *group_ptr = self.previous_group_ptr;
113        });
114    }
115}
116
117/// An object wrapper that tracks allocations and attributes them to a specific group.
118///
119/// Provides methods and implementations to help ensure that operations against/using the wrapped object have all
120/// allocations properly tracked and attributed to a given group.
121///
122/// Implements [`Future`] when the wrapped object itself implements [`Future`].
123//
124// TODO: A more complete example of this sort of thing is `tracing::Instrumented`, where they also have some fancy code
125// to trace execution even in the drop logic of the wrapped future. I'm not sure we need that here, because we don't
126// care about what components an object is deallocated in, and I don't think we expect to have any futures where the
127// drop logic actually _allocates_, and certainly not in a way where we want to attribute it to that future's attached
128// component... but for posterity, I'm mentioning it here since we _might_ consider doing it. Might.
129#[pin_project]
130#[must_use = "futures do nothing unless you `.await` or poll them"]
131pub struct Tracked<Inner> {
132    token: ResourceGroupToken,
133
134    #[pin]
135    inner: Inner,
136}
137
138impl<Inner> Tracked<Inner> {
139    /// Consumes this object and returns the inner object and tracking token.
140    pub fn into_parts(self) -> (ResourceGroupToken, Inner) {
141        (self.token, self.inner)
142    }
143}
144
145impl<Inner> Future for Tracked<Inner>
146where
147    Inner: Future,
148{
149    type Output = Inner::Output;
150
151    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152        let this = self.project();
153        let _enter = this.token.enter();
154
155        this.inner.poll(cx)
156    }
157}
158
159/// Attaches resource groups to a [`Future`].
160pub trait Track: Sized {
161    /// Instruments this type by attaching the given resource group token, returning a `Tracked` wrapper.
162    ///
163    /// The resource group will be entered every time the wrapped future is polled.
164    ///
165    /// # Examples
166    ///
167    /// ```rust
168    /// use resource_accounting::{ResourceGroupRegistry, ResourceGroupToken, Track as _};
169    ///
170    /// # async fn doc() {
171    /// let future = async {
172    ///     // All allocations in this future will be attached to the resource group
173    ///     // represented by `token`...
174    /// };
175    ///
176    /// let token = ResourceGroupRegistry::global().register_resource_group("my-group");
177    /// future
178    ///     .track_resources(token)
179    ///     .await
180    /// # }
181    fn track_resources(self, token: ResourceGroupToken) -> Tracked<Self> {
182        Tracked { token, inner: self }
183    }
184
185    /// Instruments this type by attaching the current resource group, returning a `Tracked` wrapper.
186    ///
187    /// The resource group will be entered every time the wrapped future is polled.
188    ///
189    /// This can be used to propagate the current resource group when spawning a new future.
190    ///
191    /// # Examples
192    ///
193    /// ```rust
194    /// use resource_accounting::{ResourceGroupRegistry, ResourceGroupToken, Track as _};
195    ///
196    /// # mod tokio {
197    /// #     pub(super) fn spawn(_: impl std::future::Future) {}
198    /// # }
199    /// # async fn doc() {
200    /// let token = ResourceGroupRegistry::global().register_resource_group("my-group");
201    /// let _enter = token.enter();
202    ///
203    /// // ...
204    ///
205    /// let future = async {
206    ///     // All allocations in this future will be attached to the resource group
207    ///     // represented by `token`...
208    /// };
209    /// tokio::spawn(future.in_current_resource_group());
210    /// # }
211    /// ```
212    fn in_current_resource_group(self) -> Tracked<Self> {
213        Tracked {
214            token: ResourceGroupToken::current(),
215            inner: self,
216        }
217    }
218}
219
220impl<T: Sized> Track for T {}
221
222/// A registry of resource groups and the statistics for each of them.
223///
224/// Resource groups are user-defined groups which can then be associated with memory and CPU usage in distinct code
225/// regions. This mechanism allows for granular resource accounting at the level which makes sense to the application,
226/// such as per-thread, per-async task, and so on.
227///
228/// # Token guard
229///
230/// When an resource group is registered, an `ResourceGroupToken` is returned. This token can be used to "enter" the
231/// group, which causes memory and CPU usage on the current thread to be attributed to that group. Entering the group
232/// returns a drop guard that restores the previously entered group when dropped.
233///
234/// This allows for arbitrarily nested resource groups.
235///
236/// Additionally, [`Tracked`] can be used to wrap a [`Future`], attaching it to a specific resource group token. This
237/// causes the future to track all memory and CPU usage during polls such that the usage is properly attributed to the
238/// resource group.
239///
240/// # Resources tracked
241///
242/// ## Memory usage
243///
244/// In order for memory usage to be tracked, [`TrackingAllocator`][crate::allocator::TrackingAllocator] must be installed
245/// as the global allocator for the process.
246///
247/// ## CPU usage
248///
249/// CPU usage is automatically tracked if platform support is detected.
250///
251/// Currently, only Linux is supported for CPU usage tracking.
252pub struct ResourceGroupRegistry {
253    resource_groups: Mutex<HashMap<String, Box<ResourceStats>>>,
254}
255
256impl ResourceGroupRegistry {
257    fn new() -> Self {
258        in_root_resource_group(|| Self {
259            resource_groups: Mutex::new(HashMap::with_capacity(4)),
260        })
261    }
262
263    /// Gets a reference to the global resource group registry.
264    pub fn global() -> &'static Self {
265        REGISTRY.get_or_init(Self::new)
266    }
267
268    /// Returns `true` if `TrackingAllocator` is installed as the global allocator.
269    pub fn allocator_installed() -> bool {
270        // Essentially, when we load the group registry, and it gets created for the first time, it will specifically
271        // allocate its internal data structures while entered into the root resource group.
272        //
273        // This means that if the allocator is installed, we should always have some allocations in the root group by
274        // the time we call `ResourceStats::has_allocated`.
275        ROOT_GROUP.has_allocated()
276    }
277
278    /// Registers a new resource group with the given name.
279    ///
280    /// Returns an `ResourceGroupToken` that can be used to attribute CPU and memory usage to the
281    /// newly created resource group.
282    pub fn register_resource_group<S>(&self, name: S) -> ResourceGroupToken
283    where
284        S: AsRef<str>,
285    {
286        in_root_resource_group(|| {
287            let mut resource_groups = self.resource_groups.lock().unwrap();
288            match resource_groups.get(name.as_ref()) {
289                Some(stats) => ResourceGroupToken::new(NonNull::from(&**stats)),
290                None => {
291                    let resource_group_stats = Box::new(ResourceStats::new());
292                    let token = ResourceGroupToken::new(NonNull::from(&*resource_group_stats));
293
294                    resource_groups.insert(name.as_ref().to_string(), resource_group_stats);
295
296                    token
297                }
298            }
299        })
300    }
301
302    /// Visits all resource groups in the registry and calls the given closure with their names and statistics.
303    pub fn visit_resource_groups<F>(&self, mut f: F)
304    where
305        F: FnMut(&str, &ResourceStats),
306    {
307        in_root_resource_group(|| {
308            f("root", &ROOT_GROUP);
309
310            let resource_groups = self.resource_groups.lock().unwrap();
311            for (name, stats) in resource_groups.iter() {
312                f(name, stats);
313            }
314        });
315    }
316}
317
318fn in_root_resource_group<F, R>(f: F) -> R
319where
320    F: FnOnce() -> R,
321{
322    let token = ResourceGroupToken::root();
323    let _enter = token.enter();
324    f()
325}
326
327#[cfg(test)]
328mod tests {
329    use super::ResourceGroupRegistry;
330
331    #[test]
332    fn existing_group() {
333        let registry = ResourceGroupRegistry::new();
334        let token = registry.register_resource_group("test");
335        let token2 = registry.register_resource_group("test");
336        let token3 = registry.register_resource_group("test2");
337
338        assert!(token.ptr_eq(&token2));
339        assert!(!token.ptr_eq(&token3));
340    }
341
342    #[test]
343    fn visit_resource_groups() {
344        let registry = ResourceGroupRegistry::new();
345        let _token = registry.register_resource_group("my-group");
346
347        let mut visited = Vec::new();
348        registry.visit_resource_groups(|name, _stats| {
349            visited.push(name.to_string());
350        });
351
352        assert_eq!(visited.len(), 2);
353        assert_eq!(visited[0], "root");
354        assert_eq!(visited[1], "my-group");
355    }
356}