resource_accounting/groups.rs
1use std::{
2 cell::RefCell,
3 collections::HashMap,
4 future::Future,
5 marker::PhantomData,
6 pin::Pin,
7 ptr::NonNull,
8 sync::{Mutex, OnceLock},
9 task::{Context, Poll},
10};
11
12use pin_project::pin_project;
13
14use super::stats::thread_cpu_time_nanos;
15use crate::ResourceStats;
16
17static REGISTRY: OnceLock<ResourceGroupRegistry> = OnceLock::new();
18static ROOT_GROUP: ResourceStats = ResourceStats::new();
19
20thread_local! {
21 pub(super) static CURRENT_GROUP: RefCell<NonNull<ResourceStats>> = RefCell::new(NonNull::from(&ROOT_GROUP));
22}
23
24/// A token associated with a specific resource group.
25///
26/// Used to attribute allocations and deallocations to a specific group with a scope guard [`ResourceTrackingGuard`], or
27/// through helpers provided by the [`Track`] trait.
28#[derive(Clone, Copy)]
29pub struct ResourceGroupToken {
30 group_ptr: NonNull<ResourceStats>,
31}
32
33impl ResourceGroupToken {
34 fn new(group_ptr: NonNull<ResourceStats>) -> Self {
35 Self { group_ptr }
36 }
37
38 /// Returns an `ResourceGroupToken` for the current resource group.
39 pub fn current() -> Self {
40 CURRENT_GROUP.with(|current_group| {
41 let group_ptr = current_group.borrow();
42 Self::new(*group_ptr)
43 })
44 }
45
46 #[cfg(test)]
47 fn ptr_eq(&self, other: &Self) -> bool {
48 self.group_ptr == other.group_ptr
49 }
50
51 /// Returns the token for the root resource group.
52 pub(crate) fn root() -> Self {
53 Self::new(NonNull::from(&ROOT_GROUP))
54 }
55
56 /// Enters this resource group, returning a guard that will exit the resource group when dropped.
57 pub fn enter(&self) -> ResourceTrackingGuard<'_> {
58 // Track our starting point for this thread's CPU usage.
59 let thread_cpu_usage_start = thread_cpu_time_nanos().unwrap_or(0);
60
61 // Swap the current group to the one we're tracking.
62 CURRENT_GROUP.with(|current_group| {
63 let mut group_ptr = current_group.borrow_mut();
64 let previous_group_ptr = *group_ptr;
65 *group_ptr = self.group_ptr;
66
67 ResourceTrackingGuard {
68 previous_group_ptr,
69 thread_cpu_usage_start,
70 _token: PhantomData,
71 }
72 })
73 }
74}
75
76// SAFETY: There's nothing inherently thread-specific about the token.
77unsafe impl Send for ResourceGroupToken {}
78
79// SAFETY: There's nothing unsafe about sharing the token between threads, as it's safe to enter the same token on
80// multiple threads at the same time, and the token itself has no internal state or interior mutability.
81unsafe impl Sync for ResourceGroupToken {}
82
83/// A guard representing an resource group which has been entered.
84///
85/// When the guard is dropped, the resource group will be exited and the previously entered resource group will be
86/// restored.
87///
88/// This is returned by the [`ResourceGroupToken::enter`] method.
89pub struct ResourceTrackingGuard<'a> {
90 previous_group_ptr: NonNull<ResourceStats>,
91 thread_cpu_usage_start: u64,
92 _token: PhantomData<&'a ResourceGroupToken>,
93}
94
95impl Drop for ResourceTrackingGuard<'_> {
96 fn drop(&mut self) {
97 // Grab our current total CPU usage for the thread, and calculate the delta.
98 let thread_cpu_usage_end = thread_cpu_time_nanos().unwrap_or(0);
99 let cpu_usage_delta = thread_cpu_usage_end.saturating_sub(self.thread_cpu_usage_start);
100
101 // Reset the current group to the one that existed before we entered.
102 CURRENT_GROUP.with(|current_group| {
103 let mut group_ptr = current_group.borrow_mut();
104
105 // Now track the delta in CPU usage, if available, before resetting the group.
106 if cpu_usage_delta != 0 {
107 // SAFETY: We only construct the pointer to `ResourceStats` from a leaked heap allocation, and we never
108 // deallocate it, so it's always non-null/aligned/valid-for-`T`, etc.
109 unsafe { group_ptr.as_ref().track_cpu_time(cpu_usage_delta) }
110 }
111
112 *group_ptr = self.previous_group_ptr;
113 });
114 }
115}
116
117/// An object wrapper that tracks allocations and attributes them to a specific group.
118///
119/// Provides methods and implementations to help ensure that operations against/using the wrapped object have all
120/// allocations properly tracked and attributed to a given group.
121///
122/// Implements [`Future`] when the wrapped object itself implements [`Future`].
123//
124// TODO: A more complete example of this sort of thing is `tracing::Instrumented`, where they also have some fancy code
125// to trace execution even in the drop logic of the wrapped future. I'm not sure we need that here, because we don't
126// care about what components an object is deallocated in, and I don't think we expect to have any futures where the
127// drop logic actually _allocates_, and certainly not in a way where we want to attribute it to that future's attached
128// component... but for posterity, I'm mentioning it here since we _might_ consider doing it. Might.
129#[pin_project]
130#[must_use = "futures do nothing unless you `.await` or poll them"]
131pub struct Tracked<Inner> {
132 token: ResourceGroupToken,
133
134 #[pin]
135 inner: Inner,
136}
137
138impl<Inner> Tracked<Inner> {
139 /// Consumes this object and returns the inner object and tracking token.
140 pub fn into_parts(self) -> (ResourceGroupToken, Inner) {
141 (self.token, self.inner)
142 }
143}
144
145impl<Inner> Future for Tracked<Inner>
146where
147 Inner: Future,
148{
149 type Output = Inner::Output;
150
151 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
152 let this = self.project();
153 let _enter = this.token.enter();
154
155 this.inner.poll(cx)
156 }
157}
158
159/// Attaches resource groups to a [`Future`].
160pub trait Track: Sized {
161 /// Instruments this type by attaching the given resource group token, returning a `Tracked` wrapper.
162 ///
163 /// The resource group will be entered every time the wrapped future is polled.
164 ///
165 /// # Examples
166 ///
167 /// ```rust
168 /// use resource_accounting::{ResourceGroupRegistry, ResourceGroupToken, Track as _};
169 ///
170 /// # async fn doc() {
171 /// let future = async {
172 /// // All allocations in this future will be attached to the resource group
173 /// // represented by `token`...
174 /// };
175 ///
176 /// let token = ResourceGroupRegistry::global().register_resource_group("my-group");
177 /// future
178 /// .track_resources(token)
179 /// .await
180 /// # }
181 fn track_resources(self, token: ResourceGroupToken) -> Tracked<Self> {
182 Tracked { token, inner: self }
183 }
184
185 /// Instruments this type by attaching the current resource group, returning a `Tracked` wrapper.
186 ///
187 /// The resource group will be entered every time the wrapped future is polled.
188 ///
189 /// This can be used to propagate the current resource group when spawning a new future.
190 ///
191 /// # Examples
192 ///
193 /// ```rust
194 /// use resource_accounting::{ResourceGroupRegistry, ResourceGroupToken, Track as _};
195 ///
196 /// # mod tokio {
197 /// # pub(super) fn spawn(_: impl std::future::Future) {}
198 /// # }
199 /// # async fn doc() {
200 /// let token = ResourceGroupRegistry::global().register_resource_group("my-group");
201 /// let _enter = token.enter();
202 ///
203 /// // ...
204 ///
205 /// let future = async {
206 /// // All allocations in this future will be attached to the resource group
207 /// // represented by `token`...
208 /// };
209 /// tokio::spawn(future.in_current_resource_group());
210 /// # }
211 /// ```
212 fn in_current_resource_group(self) -> Tracked<Self> {
213 Tracked {
214 token: ResourceGroupToken::current(),
215 inner: self,
216 }
217 }
218}
219
220impl<T: Sized> Track for T {}
221
222/// A registry of resource groups and the statistics for each of them.
223///
224/// Resource groups are user-defined groups which can then be associated with memory and CPU usage in distinct code
225/// regions. This mechanism allows for granular resource accounting at the level which makes sense to the application,
226/// such as per-thread, per-async task, and so on.
227///
228/// # Token guard
229///
230/// When an resource group is registered, an `ResourceGroupToken` is returned. This token can be used to "enter" the
231/// group, which causes memory and CPU usage on the current thread to be attributed to that group. Entering the group
232/// returns a drop guard that restores the previously entered group when dropped.
233///
234/// This allows for arbitrarily nested resource groups.
235///
236/// Additionally, [`Tracked`] can be used to wrap a [`Future`], attaching it to a specific resource group token. This
237/// causes the future to track all memory and CPU usage during polls such that the usage is properly attributed to the
238/// resource group.
239///
240/// # Resources tracked
241///
242/// ## Memory usage
243///
244/// In order for memory usage to be tracked, [`TrackingAllocator`][crate::allocator::TrackingAllocator] must be installed
245/// as the global allocator for the process.
246///
247/// ## CPU usage
248///
249/// CPU usage is automatically tracked if platform support is detected.
250///
251/// Currently, only Linux is supported for CPU usage tracking.
252pub struct ResourceGroupRegistry {
253 resource_groups: Mutex<HashMap<String, Box<ResourceStats>>>,
254}
255
256impl ResourceGroupRegistry {
257 fn new() -> Self {
258 in_root_resource_group(|| Self {
259 resource_groups: Mutex::new(HashMap::with_capacity(4)),
260 })
261 }
262
263 /// Gets a reference to the global resource group registry.
264 pub fn global() -> &'static Self {
265 REGISTRY.get_or_init(Self::new)
266 }
267
268 /// Returns `true` if `TrackingAllocator` is installed as the global allocator.
269 pub fn allocator_installed() -> bool {
270 // Essentially, when we load the group registry, and it gets created for the first time, it will specifically
271 // allocate its internal data structures while entered into the root resource group.
272 //
273 // This means that if the allocator is installed, we should always have some allocations in the root group by
274 // the time we call `ResourceStats::has_allocated`.
275 ROOT_GROUP.has_allocated()
276 }
277
278 /// Registers a new resource group with the given name.
279 ///
280 /// Returns an `ResourceGroupToken` that can be used to attribute CPU and memory usage to the
281 /// newly created resource group.
282 pub fn register_resource_group<S>(&self, name: S) -> ResourceGroupToken
283 where
284 S: AsRef<str>,
285 {
286 in_root_resource_group(|| {
287 let mut resource_groups = self.resource_groups.lock().unwrap();
288 match resource_groups.get(name.as_ref()) {
289 Some(stats) => ResourceGroupToken::new(NonNull::from(&**stats)),
290 None => {
291 let resource_group_stats = Box::new(ResourceStats::new());
292 let token = ResourceGroupToken::new(NonNull::from(&*resource_group_stats));
293
294 resource_groups.insert(name.as_ref().to_string(), resource_group_stats);
295
296 token
297 }
298 }
299 })
300 }
301
302 /// Visits all resource groups in the registry and calls the given closure with their names and statistics.
303 pub fn visit_resource_groups<F>(&self, mut f: F)
304 where
305 F: FnMut(&str, &ResourceStats),
306 {
307 in_root_resource_group(|| {
308 f("root", &ROOT_GROUP);
309
310 let resource_groups = self.resource_groups.lock().unwrap();
311 for (name, stats) in resource_groups.iter() {
312 f(name, stats);
313 }
314 });
315 }
316}
317
318fn in_root_resource_group<F, R>(f: F) -> R
319where
320 F: FnOnce() -> R,
321{
322 let token = ResourceGroupToken::root();
323 let _enter = token.enter();
324 f()
325}
326
327#[cfg(test)]
328mod tests {
329 use super::ResourceGroupRegistry;
330
331 #[test]
332 fn existing_group() {
333 let registry = ResourceGroupRegistry::new();
334 let token = registry.register_resource_group("test");
335 let token2 = registry.register_resource_group("test");
336 let token3 = registry.register_resource_group("test2");
337
338 assert!(token.ptr_eq(&token2));
339 assert!(!token.ptr_eq(&token3));
340 }
341
342 #[test]
343 fn visit_resource_groups() {
344 let registry = ResourceGroupRegistry::new();
345 let _token = registry.register_resource_group("my-group");
346
347 let mut visited = Vec::new();
348 registry.visit_resource_groups(|name, _stats| {
349 visited.push(name.to_string());
350 });
351
352 assert_eq!(visited.len(), 2);
353 assert_eq!(visited[0], "root");
354 assert_eq!(visited[1], "my-group");
355 }
356}