Skip to main content

memory_accounting/
registry.rs

1use std::marker::PhantomData;
2use std::{
3    collections::HashMap,
4    sync::{Arc, Mutex, MutexGuard},
5};
6
7use crate::UsageExpr;
8use crate::{
9    allocator::{AllocationGroupRegistry, AllocationGroupToken},
10    api::MemoryAPIHandler,
11    BoundsVerifier, ComponentBounds, MemoryBounds, MemoryGrant, VerifiedBounds, VerifierError,
12};
13
14pub(crate) struct ComponentMetadata {
15    full_name: Option<String>,
16    bounds: ComponentBounds,
17    token: Option<AllocationGroupToken>,
18    subcomponents: HashMap<String, Arc<Mutex<ComponentMetadata>>>,
19}
20
21impl ComponentMetadata {
22    fn from_full_name(full_name: Option<String>) -> Self {
23        Self {
24            full_name,
25            bounds: ComponentBounds::default(),
26            token: None,
27            subcomponents: HashMap::new(),
28        }
29    }
30
31    pub fn get_or_create<S>(&mut self, name: S) -> Arc<Mutex<Self>>
32    where
33        S: AsRef<str>,
34    {
35        // Split the name into the current level name and the remaining name.
36        //
37        // This lets us handle names which refer to a target nested component instead of having to chain a ton of calls
38        // together.
39        let name = name.as_ref();
40        let (current_level_name, remaining_name) = match name.split_once('.') {
41            Some((current_level_name, remaining_name)) => (current_level_name, Some(remaining_name)),
42            None => (name, None),
43        };
44
45        // Now we need to see if we have an existing component here or if we need to create a new one.
46        match self.subcomponents.get(current_level_name) {
47            Some(existing) => match remaining_name {
48                Some(remaining_name) => {
49                    // We found an intermediate subcomponent, so keep recursing.
50                    existing.lock().unwrap().get_or_create(remaining_name)
51                }
52                None => {
53                    // We've found the leaf subcomponent.
54                    Arc::clone(existing)
55                }
56            },
57            None => {
58                // We couldn't find the component at this level, so we need to create it.
59                //
60                // We do all of our name calculation and so on, but we also leave the token empty for now. We do this to
61                // avoid registering intermediate components that aren't actually used by the code, but are simply a
62                // consequence of wanting to having a nicely nested structure.
63                //
64                // We'll register a token for the component the first time it's requested.
65                let full_name = match self.full_name.as_ref() {
66                    Some(parent_full_name) => format!("{}.{}", parent_full_name, current_level_name),
67                    None => current_level_name.to_string(),
68                };
69
70                let inner = self
71                    .subcomponents
72                    .entry(current_level_name.to_string())
73                    .or_insert_with(|| Arc::new(Mutex::new(Self::from_full_name(Some(full_name)))));
74
75                // If we still need to recurse further, do so here.. otherwise, return the subcomponent we just created
76                // as-is.
77                match remaining_name {
78                    Some(remaining_name) => inner.lock().unwrap().get_or_create(remaining_name),
79                    None => Arc::clone(inner),
80                }
81            }
82        }
83    }
84
85    fn token(&mut self) -> AllocationGroupToken {
86        match self.token {
87            Some(token) => token,
88            None => match self.full_name.as_deref() {
89                Some(full_name) => {
90                    let allocator_component_registry = AllocationGroupRegistry::global();
91                    let token = allocator_component_registry.register_allocation_group(full_name);
92                    self.token = Some(token);
93
94                    token
95                }
96                None => AllocationGroupToken::root(),
97            },
98        }
99    }
100
101    fn reset(&mut self) {
102        self.bounds = ComponentBounds::default();
103        self.subcomponents.clear();
104    }
105
106    pub fn self_bounds(&self) -> &ComponentBounds {
107        &self.bounds
108    }
109
110    pub fn as_bounds(&self) -> ComponentBounds {
111        let mut bounds = ComponentBounds::default();
112        bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes.clone();
113        bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes.clone();
114
115        for (name, subcomponent) in self.subcomponents.iter() {
116            let subcomponent = subcomponent.lock().unwrap();
117            let subcomponent_bounds = subcomponent.as_bounds();
118            bounds.subcomponents.insert(name.clone(), subcomponent_bounds);
119        }
120
121        bounds
122    }
123}
124
125/// A registry for components for tracking memory bounds and runtime memory usage.
126///
127/// This registry provides a unified interface for declaring the memory bounds of a "component", as well as registering
128/// that component for runtime memory usage tracking when using the tracking allocator implementation in `memory-accounting`.
129///
130/// ## Components
131///
132/// **Components** are any logical grouping of memory usage within a program, and they can be arbitrarily nested.
133///
134/// For example, a data plane will generally have a topology that defines the components used to accept, process, and
135/// forward data. The topology itself could be considered a component, and each individual source, transform, and
136/// destination within it could be subcomponents of the topology.
137///
138/// Components are generally meant to be tied to something that has its own memory bounds and is somewhat "standalone",
139/// but this is not an absolute requirement and components can be nested more granularly for organizational/aesthetic
140/// purposes. Again, for example, one might opt to create a component in their topology for each component type --
141/// sources, transforms, and destinations -- and then add the actual instances of those components as subcomponents to
142/// each grouping, leading to a nested structure such as `topology/sources/source1`, `topology/transforms/transform1`,
143/// and so on.
144///
145/// ## Bounds
146///
147/// Every component is able to define memory bounds for itself and its subcomponents. A builder-style API is exposed to
148/// allow for ergonomically defining these bounds -- both minimum and firm -- for components, as well as extending the
149/// nestable aspect of the registry itself to the bounds builder, allowing for flexibility in where components are
150/// defined from and how they are nested.
151///
152/// ## Allocation tracking
153///
154/// Every component is also able to be registered with its own allocation group when using the tracking allocator
155/// implementation. This is done on demand when the component's token is requested, which avoids polluting the tracking
156/// allocator with components that are never actually used, such as those used for organizational/aesthetic purposes.
157pub struct ComponentRegistry {
158    inner: Arc<Mutex<ComponentMetadata>>,
159    root: Option<Arc<Mutex<ComponentMetadata>>>,
160}
161
162impl ComponentRegistry {
163    fn get_root(&self) -> Arc<Mutex<ComponentMetadata>> {
164        match &self.root {
165            Some(root) => Arc::clone(root),
166            None => Arc::clone(&self.inner),
167        }
168    }
169
170    /// Creates a handle to this registry.
171    ///
172    /// The handle provides read-only access to root-level operations like creating API handlers and verifying bounds. It
173    /// can be freely cloned and shared.
174    pub fn root(&self) -> ComponentRegistryHandle {
175        ComponentRegistryHandle { inner: self.get_root() }
176    }
177
178    /// Gets a component by name, or creates it if it doesn't exist.
179    ///
180    /// The name provided can be given in a direct (`component_name`) or nested (`path.to.component_name`) form. If the
181    /// nested form is given, each component in the path will be created if it doesn't exist.
182    ///
183    /// Returns a `ComponentRegistry` scoped to the component.
184    pub fn get_or_create<S>(&self, name: S) -> Self
185    where
186        S: AsRef<str>,
187    {
188        let mut inner = self.inner.lock().unwrap();
189        Self {
190            inner: inner.get_or_create(name),
191            root: Some(self.get_root()),
192        }
193    }
194
195    /// Gets a bounds builder attached to the root component.
196    pub fn bounds_builder(&mut self) -> MemoryBoundsBuilder<'_> {
197        MemoryBoundsBuilder {
198            inner: Self {
199                inner: Arc::clone(&self.inner),
200                root: Some(self.get_root()),
201            },
202            _lt: PhantomData,
203        }
204    }
205
206    /// Gets the tracking token for the component scoped to this registry.
207    ///
208    /// If the component is the root component (has no name), the root allocation token is returned.  Otherwise, the
209    /// component is registered (using its full name) if it hasn't already been, and that token is returned.
210    pub fn token(&mut self) -> AllocationGroupToken {
211        let mut inner = self.inner.lock().unwrap();
212        inner.token()
213    }
214
215    /// Gets the total minimum required bytes for this component and all subcomponents.
216    pub fn as_bounds(&self) -> ComponentBounds {
217        self.inner.lock().unwrap().as_bounds()
218    }
219}
220
221/// A cloneable, read-only handle to a component registry.
222///
223/// This handle provides access to read-only operations such as creating an API handler or verifying bounds. Unlike
224/// [`ComponentRegistry`], it can be freely cloned and shared across ownership boundaries.
225///
226/// Obtained via [`ComponentRegistry::root`].
227#[derive(Clone)]
228pub struct ComponentRegistryHandle {
229    inner: Arc<Mutex<ComponentMetadata>>,
230}
231
232impl ComponentRegistryHandle {
233    /// Validates that all components are able to respect the calculated effective limit.
234    ///
235    /// If validation succeeds, `VerifiedBounds` is returned, which provides information about the effective limit that
236    /// can be used for allocating memory.
237    ///
238    /// ## Errors
239    ///
240    /// A number of invalid conditions are checked and will cause an error to be returned:
241    ///
242    /// - when a component has invalid bounds (for example, minimum required bytes higher than firm limit)
243    /// - when the combined total of the firm limit for all components exceeds the effective limit
244    pub fn verify_bounds(&self, initial_grant: MemoryGrant) -> Result<VerifiedBounds, VerifierError> {
245        let bounds = self.inner.lock().unwrap().as_bounds();
246        BoundsVerifier::new(initial_grant, bounds).verify()
247    }
248
249    /// Gets an API handler for reporting the memory bounds and usage of all component in the registry.
250    ///
251    /// This handler exposes routes for querying the memory bounds and usage of all registered components. See
252    /// [`MemoryAPIHandler`] for more information about routes and responses.
253    pub fn api_handler(&self) -> MemoryAPIHandler {
254        MemoryAPIHandler::from_state(Arc::clone(&self.inner))
255    }
256
257    /// Gets the total minimum required bytes for all components in the registry.
258    ///
259    /// See [`ComponentRegistry::as_bounds`] for more details.
260    pub fn as_bounds(&self) -> ComponentBounds {
261        self.inner.lock().unwrap().as_bounds()
262    }
263}
264
265impl ComponentRegistry {
266    #[cfg(test)]
267    fn inner_ptr_eq(&self, other: &Self) -> bool {
268        Arc::ptr_eq(&self.inner, &other.inner)
269    }
270
271    #[cfg(test)]
272    fn root_ptr_eq(&self, handle: &ComponentRegistryHandle) -> bool {
273        Arc::ptr_eq(&self.get_root(), &handle.inner)
274    }
275}
276
277impl Default for ComponentRegistry {
278    fn default() -> Self {
279        Self {
280            inner: Arc::new(Mutex::new(ComponentMetadata::from_full_name(None))),
281            root: None,
282        }
283    }
284}
285
286pub struct Minimum;
287pub struct Firm;
288
289pub(crate) mod private {
290    pub trait Sealed {}
291
292    impl Sealed for super::Minimum {}
293    impl Sealed for super::Firm {}
294}
295
296// Simple trait-based builder state approach so we can use a single builder view to modify either the minimum required
297// or firm limit amounts.
298pub trait BoundsMutator: private::Sealed {
299    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr);
300}
301
302impl BoundsMutator for Minimum {
303    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
304        bounds.self_minimum_required_bytes.push(expr)
305    }
306}
307
308impl BoundsMutator for Firm {
309    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
310        bounds.self_firm_limit_bytes.push(expr)
311    }
312}
313
314/// Builder for defining the memory bounds of a component and its subcomponents.
315///
316/// This builder provides a simple interface for defining the minimum and firm bounds of a component, as well as
317/// declaring subcomponents. For example, a topology can contain its own "self" memory bounds, and then define the
318/// individual bounds for each component in the topology.
319pub struct MemoryBoundsBuilder<'a> {
320    inner: ComponentRegistry,
321    _lt: PhantomData<&'a ()>,
322}
323
324impl MemoryBoundsBuilder<'static> {
325    #[cfg(test)]
326    pub(crate) fn for_test() -> Self {
327        Self {
328            inner: ComponentRegistry::default(),
329            _lt: PhantomData,
330        }
331    }
332}
333
334impl MemoryBoundsBuilder<'_> {
335    /// Resets the bounds of the current component to a default state.
336    ///
337    /// This can be used in scenarios where the bounds of a component need to be redefined after they have been
338    /// specified, as not all components are able to be defined in a single pass.
339    pub fn reset(&mut self) {
340        let mut inner = self.inner.inner.lock().unwrap();
341        inner.reset();
342    }
343
344    /// Gets a builder object for defining the minimum bounds of the current component.
345    pub fn minimum(&mut self) -> BoundsBuilder<'_, Minimum> {
346        let bounds = self.inner.inner.lock().unwrap();
347        BoundsBuilder::<'_, Minimum>::new(bounds)
348    }
349
350    /// Gets a builder object for defining the firm bounds of the current component.
351    ///
352    /// The firm limit is additive with the minimum required memory, so entries that are added via `minimum` do not need
353    /// to be added again here.
354    pub fn firm(&mut self) -> BoundsBuilder<'_, Firm> {
355        let bounds = self.inner.inner.lock().unwrap();
356        BoundsBuilder::<'_, Firm>::new(bounds)
357    }
358
359    /// Creates a nested subcomponent and gets a builder object for it.
360    ///
361    /// This allows for defining the bounds of various subcomponents within a larger component, which are then rolled up
362    /// into the calculated bounds for the parent component.
363    pub fn subcomponent<S>(&mut self, name: S) -> MemoryBoundsBuilder<'_>
364    where
365        S: AsRef<str>,
366    {
367        let component = self.inner.get_or_create(name);
368        MemoryBoundsBuilder {
369            inner: component,
370            _lt: PhantomData,
371        }
372    }
373
374    /// Creates a nested subcomponent based on the given component.
375    ///
376    /// This allows for defining a subcomponent whose bounds come from an object that implements `MemoryBounds` directly.
377    pub fn with_subcomponent<S, C>(&mut self, name: S, component: &C) -> &mut Self
378    where
379        S: AsRef<str>,
380        C: MemoryBounds,
381    {
382        let mut builder = self.subcomponent(name);
383        component.specify_bounds(&mut builder);
384
385        self
386    }
387
388    #[cfg(test)]
389    pub(crate) fn as_bounds(&self) -> ComponentBounds {
390        self.inner.inner.lock().unwrap().as_bounds()
391    }
392}
393
394/// Bounds builder.
395///
396/// Helper type for defining the bounds of a component in a field-driven manner.
397pub struct BoundsBuilder<'a, S> {
398    inner: MutexGuard<'a, ComponentMetadata>,
399    _state: PhantomData<S>,
400}
401
402impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> {
403    fn new(inner: MutexGuard<'a, ComponentMetadata>) -> Self {
404        Self {
405            inner,
406            _state: PhantomData,
407        }
408    }
409
410    /// Accounts for the in-memory size of a single value.
411    ///
412    /// This is useful for tracking the expected memory usage of a single instance of a type if that type is heap
413    /// allocated. For example, components that are spawned by a topology generally end up being boxed, which means a
414    /// heap allocation exists that is the size of the component type.
415    pub fn with_single_value<T>(&mut self, name: impl Into<String>) -> &mut Self {
416        S::add_usage(&mut self.inner.bounds, UsageExpr::struct_size::<T>(name));
417        self
418    }
419
420    /// Accounts for a fixed amount of memory usage.
421    ///
422    /// This is a catch-all for directly accounting for a specific number of bytes.
423    pub fn with_fixed_amount(&mut self, name: impl Into<String>, chunk_size: usize) -> &mut Self {
424        S::add_usage(&mut self.inner.bounds, UsageExpr::constant(name, chunk_size));
425        self
426    }
427
428    /// Accounts for an item container of the given length.
429    ///
430    /// This can be used to track the expected memory usage of generalized containers like `Vec<T>`, where items are
431    /// homogeneous and allocated contiguously.
432    pub fn with_array<T>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
433        S::add_usage(
434            &mut self.inner.bounds,
435            UsageExpr::product(
436                "array",
437                UsageExpr::struct_size::<T>(name),
438                UsageExpr::constant("len", len),
439            ),
440        );
441        self
442    }
443
444    /// Accounts for a map container of the given length.
445    ///
446    /// This can be used to track the expected memory usage of generalized maps like `HashMap<K, V>`, where keys and
447    /// values are
448    pub fn with_map<K, V>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
449        S::add_usage(
450            &mut self.inner.bounds,
451            UsageExpr::product(
452                "map",
453                UsageExpr::sum(
454                    name,
455                    UsageExpr::struct_size::<K>("key"),
456                    UsageExpr::struct_size::<V>("value"),
457                ),
458                UsageExpr::constant("len", len),
459            ),
460        );
461        self
462    }
463
464    pub fn with_expr(&mut self, expr: UsageExpr) -> &mut Self {
465        S::add_usage(&mut self.inner.bounds, expr);
466        self
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn root_handle_from_root_registry_points_to_root() {
476        let registry = ComponentRegistry::default();
477        let handle = registry.root();
478
479        assert!(registry.root_ptr_eq(&handle));
480    }
481
482    #[test]
483    fn root_handle_from_subcomponent_points_to_root() {
484        let registry = ComponentRegistry::default();
485        let child = registry.get_or_create("child");
486
487        let handle = child.root();
488
489        assert!(registry.root_ptr_eq(&handle));
490        assert!(!child.inner_ptr_eq(&registry));
491    }
492
493    #[test]
494    fn root_handle_from_deeply_nested_subcomponent_points_to_root() {
495        let registry = ComponentRegistry::default();
496        let grandchild = registry.get_or_create("child").get_or_create("grandchild");
497
498        let handle = grandchild.root();
499
500        assert!(registry.root_ptr_eq(&handle));
501    }
502
503    #[test]
504    fn root_handle_from_dotted_path_subcomponent_points_to_root() {
505        let registry = ComponentRegistry::default();
506        let nested = registry.get_or_create("a.b.c");
507
508        let handle = nested.root();
509
510        assert!(registry.root_ptr_eq(&handle));
511    }
512
513    #[test]
514    fn cloned_handle_points_to_root() {
515        let registry = ComponentRegistry::default();
516        let child = registry.get_or_create("child");
517
518        let handle = child.root();
519        let cloned = handle.clone();
520
521        assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
522        assert!(registry.root_ptr_eq(&cloned));
523    }
524}