Skip to main content

resource_accounting/
registry.rs

1use std::marker::PhantomData;
2use std::{
3    collections::HashMap,
4    sync::{Arc, Mutex, MutexGuard},
5};
6
7use crate::UsageExpr;
8use crate::{
9    api::ResourceAPIHandler, BoundsVerifier, ComponentBounds, MemoryBounds, MemoryGrant, ResourceGroupRegistry,
10    ResourceGroupToken, VerifiedBounds, VerifierError,
11};
12
13pub(crate) struct ComponentMetadata {
14    full_name: Option<String>,
15    bounds: ComponentBounds,
16    token: Option<ResourceGroupToken>,
17    subcomponents: HashMap<String, Arc<Mutex<ComponentMetadata>>>,
18}
19
20impl ComponentMetadata {
21    fn from_full_name(full_name: Option<String>) -> Self {
22        Self {
23            full_name,
24            bounds: ComponentBounds::default(),
25            token: None,
26            subcomponents: HashMap::new(),
27        }
28    }
29
30    pub fn get_or_create<S>(&mut self, name: S) -> Arc<Mutex<Self>>
31    where
32        S: AsRef<str>,
33    {
34        // Split the name into the current level name and the remaining name.
35        //
36        // This lets us handle names which refer to a target nested component instead of having to chain a ton of calls
37        // together.
38        let name = name.as_ref();
39        let (current_level_name, remaining_name) = match name.split_once('.') {
40            Some((current_level_name, remaining_name)) => (current_level_name, Some(remaining_name)),
41            None => (name, None),
42        };
43
44        // Now we need to see if we have an existing component here or if we need to create a new one.
45        match self.subcomponents.get(current_level_name) {
46            Some(existing) => match remaining_name {
47                Some(remaining_name) => {
48                    // We found an intermediate subcomponent, so keep recursing.
49                    existing.lock().unwrap().get_or_create(remaining_name)
50                }
51                None => {
52                    // We've found the leaf subcomponent.
53                    Arc::clone(existing)
54                }
55            },
56            None => {
57                // We couldn't find the component at this level, so we need to create it.
58                //
59                // We do all of our name calculation and so on, but we also leave the token empty for now. We do this to
60                // avoid registering intermediate components that aren't actually used by the code, but are simply a
61                // consequence of wanting to having a nicely nested structure.
62                //
63                // We'll register a token for the component the first time it's requested.
64                let full_name = match self.full_name.as_ref() {
65                    Some(parent_full_name) => format!("{}.{}", parent_full_name, current_level_name),
66                    None => current_level_name.to_string(),
67                };
68
69                let inner = self
70                    .subcomponents
71                    .entry(current_level_name.to_string())
72                    .or_insert_with(|| Arc::new(Mutex::new(Self::from_full_name(Some(full_name)))));
73
74                // If we still need to recurse further, do so here.. otherwise, return the subcomponent we just created
75                // as-is.
76                match remaining_name {
77                    Some(remaining_name) => inner.lock().unwrap().get_or_create(remaining_name),
78                    None => Arc::clone(inner),
79                }
80            }
81        }
82    }
83
84    fn token(&mut self) -> ResourceGroupToken {
85        match self.token {
86            Some(token) => token,
87            None => match self.full_name.as_deref() {
88                Some(full_name) => {
89                    let allocator_component_registry = ResourceGroupRegistry::global();
90                    let token = allocator_component_registry.register_resource_group(full_name);
91                    self.token = Some(token);
92
93                    token
94                }
95                None => ResourceGroupToken::root(),
96            },
97        }
98    }
99
100    fn reset(&mut self) {
101        self.bounds = ComponentBounds::default();
102        self.subcomponents.clear();
103    }
104
105    pub fn self_bounds(&self) -> &ComponentBounds {
106        &self.bounds
107    }
108
109    pub fn as_bounds(&self) -> ComponentBounds {
110        let mut bounds = ComponentBounds::default();
111        bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes.clone();
112        bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes.clone();
113
114        for (name, subcomponent) in self.subcomponents.iter() {
115            let subcomponent = subcomponent.lock().unwrap();
116            let subcomponent_bounds = subcomponent.as_bounds();
117            bounds.subcomponents.insert(name.clone(), subcomponent_bounds);
118        }
119
120        bounds
121    }
122}
123
124/// A registry for components for tracking memory bounds and runtime memory usage.
125///
126/// This registry provides a unified interface for declaring the memory bounds of a _component_, as well as registering
127/// that component for runtime memory usage tracking when using the tracking allocator implementation in `resource-accounting`.
128///
129/// ## Components
130///
131/// **Components** are any logical grouping of memory usage within a program, and they can be arbitrarily nested.
132///
133/// For example, a data plane will generally have a topology that defines the components used to accept, process, and
134/// forward data. The topology itself could be considered a component, and each individual source, transform, and
135/// destination within it could be subcomponents of the topology.
136///
137/// Components are generally meant to be tied to something that has its own memory bounds and is somewhat standalone,
138/// but this isn't an absolute requirement and components can be nested more granularly for organizational/aesthetic
139/// purposes. Again, for example, one might opt to create a component in their topology for each component type --
140/// sources, transforms, and destinations -- and then add the actual instances of those components as subcomponents to
141/// each grouping, leading to a nested structure such as `topology/sources/source1`, `topology/transforms/transform1`,
142/// and so on.
143///
144/// ## Bounds
145///
146/// Every component is able to define memory bounds for itself and its subcomponents. A builder-style API is exposed to
147/// allow for ergonomically defining these bounds -- both minimum and firm -- for components, as well as extending the
148/// nestable aspect of the registry itself to the bounds builder, allowing for flexibility in where components are
149/// defined from and how they're nested.
150///
151/// ## Allocation tracking
152///
153/// Every component is also able to be registered with its own resource group when using the tracking allocator
154/// implementation. This is done on demand when the component's token is requested, which avoids polluting the tracking
155/// allocator with components that are never actually used, such as those used for organizational/aesthetic purposes.
156pub struct ComponentRegistry {
157    inner: Arc<Mutex<ComponentMetadata>>,
158    root: Option<Arc<Mutex<ComponentMetadata>>>,
159}
160
161impl ComponentRegistry {
162    fn get_root(&self) -> Arc<Mutex<ComponentMetadata>> {
163        match &self.root {
164            Some(root) => Arc::clone(root),
165            None => Arc::clone(&self.inner),
166        }
167    }
168
169    /// Creates a handle to this registry.
170    ///
171    /// The handle provides read-only access to root-level operations like creating API handlers and verifying bounds. It
172    /// can be freely cloned and shared.
173    pub fn root(&self) -> ComponentRegistryHandle {
174        ComponentRegistryHandle { inner: self.get_root() }
175    }
176
177    /// Gets a component by name, or creates it if it doesn't exist.
178    ///
179    /// The name provided can be given in a direct (`component_name`) or nested (`path.to.component_name`) form. If the
180    /// nested form is given, each component in the path will be created if it doesn't exist.
181    ///
182    /// Returns a `ComponentRegistry` scoped to the component.
183    pub fn get_or_create<S>(&self, name: S) -> Self
184    where
185        S: AsRef<str>,
186    {
187        let mut inner = self.inner.lock().unwrap();
188        Self {
189            inner: inner.get_or_create(name),
190            root: Some(self.get_root()),
191        }
192    }
193
194    /// Gets a bounds builder attached to the root component.
195    pub fn bounds_builder(&mut self) -> MemoryBoundsBuilder<'_> {
196        MemoryBoundsBuilder {
197            inner: Self {
198                inner: Arc::clone(&self.inner),
199                root: Some(self.get_root()),
200            },
201            _lt: PhantomData,
202        }
203    }
204
205    /// Gets the tracking token for the component scoped to this registry.
206    ///
207    /// If the component is the root component (has no name), the root allocation token is returned. Otherwise, the
208    /// component is registered (using its full name) if it hasn't already been, and that token is returned.
209    pub fn token(&mut self) -> ResourceGroupToken {
210        let mut inner = self.inner.lock().unwrap();
211        inner.token()
212    }
213
214    /// Gets the total minimum required bytes for this component and all subcomponents.
215    pub fn as_bounds(&self) -> ComponentBounds {
216        self.inner.lock().unwrap().as_bounds()
217    }
218}
219
220/// A cloneable, read-only handle to a component registry.
221///
222/// This handle provides access to read-only operations such as creating an API handler or verifying bounds. Unlike
223/// [`ComponentRegistry`], it can be freely cloned and shared across ownership boundaries.
224///
225/// Obtained via [`ComponentRegistry::root`].
226#[derive(Clone)]
227pub struct ComponentRegistryHandle {
228    inner: Arc<Mutex<ComponentMetadata>>,
229}
230
231impl ComponentRegistryHandle {
232    /// Validates that all components are able to respect the calculated effective limit.
233    ///
234    /// If validation succeeds, `VerifiedBounds` is returned, which provides information about the effective limit that
235    /// can be used for allocating memory.
236    ///
237    /// ## Errors
238    ///
239    /// A number of invalid conditions are checked and will cause an error to be returned:
240    ///
241    /// - when a component has invalid bounds (for example, minimum required bytes higher than firm limit)
242    /// - when the combined total of the firm limit for all components exceeds the effective limit
243    pub fn verify_bounds(&self, initial_grant: MemoryGrant) -> Result<VerifiedBounds, VerifierError> {
244        let bounds = self.inner.lock().unwrap().as_bounds();
245        BoundsVerifier::new(initial_grant, bounds).verify()
246    }
247
248    /// Gets an API handler for reporting the memory bounds and resource usage of all component in the registry.
249    ///
250    /// This handler exposes routes for querying the memory bounds and usage of all registered components. See
251    /// [`ResourceAPIHandler`] for more information about routes and responses.
252    pub fn api_handler(&self) -> ResourceAPIHandler {
253        ResourceAPIHandler::from_state(Arc::clone(&self.inner))
254    }
255
256    /// Gets the total minimum required bytes for all components in the registry.
257    ///
258    /// See [`ComponentRegistry::as_bounds`] for more details.
259    pub fn as_bounds(&self) -> ComponentBounds {
260        self.inner.lock().unwrap().as_bounds()
261    }
262}
263
264impl ComponentRegistry {
265    #[cfg(test)]
266    fn inner_ptr_eq(&self, other: &Self) -> bool {
267        Arc::ptr_eq(&self.inner, &other.inner)
268    }
269
270    #[cfg(test)]
271    fn root_ptr_eq(&self, handle: &ComponentRegistryHandle) -> bool {
272        Arc::ptr_eq(&self.get_root(), &handle.inner)
273    }
274}
275
276impl Default for ComponentRegistry {
277    fn default() -> Self {
278        Self {
279            inner: Arc::new(Mutex::new(ComponentMetadata::from_full_name(None))),
280            root: None,
281        }
282    }
283}
284
285pub struct Minimum;
286pub struct Firm;
287
288pub(crate) mod private {
289    pub trait Sealed {}
290
291    impl Sealed for super::Minimum {}
292    impl Sealed for super::Firm {}
293}
294
295// Simple trait-based builder state approach so we can use a single builder view to modify either the minimum required
296// or firm limit amounts.
297pub trait BoundsMutator: private::Sealed {
298    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr);
299}
300
301impl BoundsMutator for Minimum {
302    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
303        bounds.self_minimum_required_bytes.push(expr)
304    }
305}
306
307impl BoundsMutator for Firm {
308    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
309        bounds.self_firm_limit_bytes.push(expr)
310    }
311}
312
313/// Builder for defining the memory bounds of a component and its subcomponents.
314///
315/// This builder provides a simple interface for defining the minimum and firm bounds of a component, as well as
316/// declaring subcomponents. For example, a topology can contain its own "self" memory bounds, and then define the
317/// individual bounds for each component in the topology.
318pub struct MemoryBoundsBuilder<'a> {
319    inner: ComponentRegistry,
320    _lt: PhantomData<&'a ()>,
321}
322
323impl MemoryBoundsBuilder<'static> {
324    #[cfg(test)]
325    pub(crate) fn for_test() -> Self {
326        Self {
327            inner: ComponentRegistry::default(),
328            _lt: PhantomData,
329        }
330    }
331}
332
333impl MemoryBoundsBuilder<'_> {
334    /// Resets the bounds of the current component to a default state.
335    ///
336    /// This can be used in scenarios where the bounds of a component need to be redefined after they have been
337    /// specified, as not all components are able to be defined in a single pass.
338    pub fn reset(&mut self) {
339        let mut inner = self.inner.inner.lock().unwrap();
340        inner.reset();
341    }
342
343    /// Gets a builder object for defining the minimum bounds of the current component.
344    pub fn minimum(&mut self) -> BoundsBuilder<'_, Minimum> {
345        let bounds = self.inner.inner.lock().unwrap();
346        BoundsBuilder::<'_, Minimum>::new(bounds)
347    }
348
349    /// Gets a builder object for defining the firm bounds of the current component.
350    ///
351    /// The firm limit is additive with the minimum required memory, so entries that are added via `minimum` don't need
352    /// to be added again here.
353    pub fn firm(&mut self) -> BoundsBuilder<'_, Firm> {
354        let bounds = self.inner.inner.lock().unwrap();
355        BoundsBuilder::<'_, Firm>::new(bounds)
356    }
357
358    /// Creates a nested subcomponent and gets a builder object for it.
359    ///
360    /// This allows for defining the bounds of various subcomponents within a larger component, which are then rolled up
361    /// into the calculated bounds for the parent component.
362    pub fn subcomponent<S>(&mut self, name: S) -> MemoryBoundsBuilder<'_>
363    where
364        S: AsRef<str>,
365    {
366        let component = self.inner.get_or_create(name);
367        MemoryBoundsBuilder {
368            inner: component,
369            _lt: PhantomData,
370        }
371    }
372
373    /// Creates a nested subcomponent based on the given component.
374    ///
375    /// This allows for defining a subcomponent whose bounds come from an object that implements `MemoryBounds` directly.
376    pub fn with_subcomponent<S, C>(&mut self, name: S, component: &C) -> &mut Self
377    where
378        S: AsRef<str>,
379        C: MemoryBounds,
380    {
381        let mut builder = self.subcomponent(name);
382        component.specify_bounds(&mut builder);
383
384        self
385    }
386
387    #[cfg(test)]
388    pub(crate) fn as_bounds(&self) -> ComponentBounds {
389        self.inner.inner.lock().unwrap().as_bounds()
390    }
391}
392
393/// Bounds builder.
394///
395/// Helper type for defining the bounds of a component in a field-driven manner.
396pub struct BoundsBuilder<'a, S> {
397    inner: MutexGuard<'a, ComponentMetadata>,
398    _state: PhantomData<S>,
399}
400
401impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> {
402    fn new(inner: MutexGuard<'a, ComponentMetadata>) -> Self {
403        Self {
404            inner,
405            _state: PhantomData,
406        }
407    }
408
409    /// Accounts for the in-memory size of a single value.
410    ///
411    /// This is useful for tracking the expected memory usage of a single instance of a type if that type is heap
412    /// allocated. For example, components that are spawned by a topology generally end up being boxed, which means a
413    /// heap allocation exists that's the size of the component type.
414    pub fn with_single_value<T>(&mut self, name: impl Into<String>) -> &mut Self {
415        S::add_usage(&mut self.inner.bounds, UsageExpr::struct_size::<T>(name));
416        self
417    }
418
419    /// Accounts for a fixed amount of memory usage.
420    ///
421    /// This is a catch-all for directly accounting for a specific number of bytes.
422    pub fn with_fixed_amount(&mut self, name: impl Into<String>, chunk_size: usize) -> &mut Self {
423        S::add_usage(&mut self.inner.bounds, UsageExpr::constant(name, chunk_size));
424        self
425    }
426
427    /// Accounts for an item container of the given length.
428    ///
429    /// This can be used to track the expected memory usage of generalized containers like `Vec<T>`, where items are
430    /// homogeneous and allocated contiguously.
431    pub fn with_array<T>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
432        S::add_usage(
433            &mut self.inner.bounds,
434            UsageExpr::product(
435                "array",
436                UsageExpr::struct_size::<T>(name),
437                UsageExpr::constant("len", len),
438            ),
439        );
440        self
441    }
442
443    /// Accounts for a map container of the given length.
444    ///
445    /// This can be used to track the expected memory usage of generalized maps like `HashMap<K, V>`, where keys and
446    /// values are
447    pub fn with_map<K, V>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
448        S::add_usage(
449            &mut self.inner.bounds,
450            UsageExpr::product(
451                "map",
452                UsageExpr::sum(
453                    name,
454                    UsageExpr::struct_size::<K>("key"),
455                    UsageExpr::struct_size::<V>("value"),
456                ),
457                UsageExpr::constant("len", len),
458            ),
459        );
460        self
461    }
462
463    pub fn with_expr(&mut self, expr: UsageExpr) -> &mut Self {
464        S::add_usage(&mut self.inner.bounds, expr);
465        self
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472
473    #[test]
474    fn root_handle_from_root_registry_points_to_root() {
475        let registry = ComponentRegistry::default();
476        let handle = registry.root();
477
478        assert!(registry.root_ptr_eq(&handle));
479    }
480
481    #[test]
482    fn root_handle_from_subcomponent_points_to_root() {
483        let registry = ComponentRegistry::default();
484        let child = registry.get_or_create("child");
485
486        let handle = child.root();
487
488        assert!(registry.root_ptr_eq(&handle));
489        assert!(!child.inner_ptr_eq(&registry));
490    }
491
492    #[test]
493    fn root_handle_from_deeply_nested_subcomponent_points_to_root() {
494        let registry = ComponentRegistry::default();
495        let grandchild = registry.get_or_create("child").get_or_create("grandchild");
496
497        let handle = grandchild.root();
498
499        assert!(registry.root_ptr_eq(&handle));
500    }
501
502    #[test]
503    fn root_handle_from_dotted_path_subcomponent_points_to_root() {
504        let registry = ComponentRegistry::default();
505        let nested = registry.get_or_create("a.b.c");
506
507        let handle = nested.root();
508
509        assert!(registry.root_ptr_eq(&handle));
510    }
511
512    #[test]
513    fn cloned_handle_points_to_root() {
514        let registry = ComponentRegistry::default();
515        let child = registry.get_or_create("child");
516
517        let handle = child.root();
518        let cloned = handle.clone();
519
520        assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
521        assert!(registry.root_ptr_eq(&cloned));
522    }
523}