memory_accounting/
registry.rs

1use std::marker::PhantomData;
2use std::{
3    collections::HashMap,
4    sync::{Arc, Mutex, MutexGuard},
5};
6
7use crate::UsageExpr;
8use crate::{
9    allocator::{AllocationGroupRegistry, AllocationGroupToken},
10    api::MemoryAPIHandler,
11    BoundsVerifier, ComponentBounds, MemoryBounds, MemoryGrant, VerifiedBounds, VerifierError,
12};
13
14pub(crate) struct ComponentMetadata {
15    full_name: Option<String>,
16    bounds: ComponentBounds,
17    token: Option<AllocationGroupToken>,
18    subcomponents: HashMap<String, Arc<Mutex<ComponentMetadata>>>,
19}
20
21impl ComponentMetadata {
22    fn from_full_name(full_name: Option<String>) -> Self {
23        Self {
24            full_name,
25            bounds: ComponentBounds::default(),
26            token: None,
27            subcomponents: HashMap::new(),
28        }
29    }
30
31    pub fn get_or_create<S>(&mut self, name: S) -> Arc<Mutex<Self>>
32    where
33        S: AsRef<str>,
34    {
35        // Split the name into the current level name and the remaining name.
36        //
37        // This lets us handle names which refer to a target nested component instead of having to chain a ton of calls
38        // together.
39        let name = name.as_ref();
40        let (current_level_name, remaining_name) = match name.split_once('.') {
41            Some((current_level_name, remaining_name)) => (current_level_name, Some(remaining_name)),
42            None => (name, None),
43        };
44
45        // Now we need to see if we have an existing component here or if we need to create a new one.
46        match self.subcomponents.get(current_level_name) {
47            Some(existing) => match remaining_name {
48                Some(remaining_name) => {
49                    // We found an intermediate subcomponent, so keep recursing.
50                    existing.lock().unwrap().get_or_create(remaining_name)
51                }
52                None => {
53                    // We've found the leaf subcomponent.
54                    Arc::clone(existing)
55                }
56            },
57            None => {
58                // We couldn't find the component at this level, so we need to create it.
59                //
60                // We do all of our name calculation and so on, but we also leave the token empty for now. We do this to
61                // avoid registering intermediate components that aren't actually used by the code, but are simply a
62                // consequence of wanting to having a nicely nested structure.
63                //
64                // We'll register a token for the component the first time it's requested.
65                let full_name = match self.full_name.as_ref() {
66                    Some(parent_full_name) => format!("{}.{}", parent_full_name, current_level_name),
67                    None => current_level_name.to_string(),
68                };
69
70                let inner = self
71                    .subcomponents
72                    .entry(current_level_name.to_string())
73                    .or_insert_with(|| Arc::new(Mutex::new(Self::from_full_name(Some(full_name)))));
74
75                // If we still need to recurse further, do so here.. otherwise, return the subcomponent we just created
76                // as-is.
77                match remaining_name {
78                    Some(remaining_name) => inner.lock().unwrap().get_or_create(remaining_name),
79                    None => Arc::clone(inner),
80                }
81            }
82        }
83    }
84
85    fn token(&mut self) -> AllocationGroupToken {
86        match self.token {
87            Some(token) => token,
88            None => match self.full_name.as_deref() {
89                Some(full_name) => {
90                    let allocator_component_registry = AllocationGroupRegistry::global();
91                    let token = allocator_component_registry.register_allocation_group(full_name);
92                    self.token = Some(token);
93
94                    token
95                }
96                None => AllocationGroupToken::root(),
97            },
98        }
99    }
100
101    fn reset(&mut self) {
102        self.bounds = ComponentBounds::default();
103        self.subcomponents.clear();
104    }
105
106    pub fn self_bounds(&self) -> &ComponentBounds {
107        &self.bounds
108    }
109
110    pub fn as_bounds(&self) -> ComponentBounds {
111        let mut bounds = ComponentBounds::default();
112        bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes.clone();
113        bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes.clone();
114
115        for (name, subcomponent) in self.subcomponents.iter() {
116            let subcomponent = subcomponent.lock().unwrap();
117            let subcomponent_bounds = subcomponent.as_bounds();
118            bounds.subcomponents.insert(name.clone(), subcomponent_bounds);
119        }
120
121        bounds
122    }
123}
124
125/// A registry for components for tracking memory bounds and runtime memory usage.
126///
127/// This registry provides a unified interface for declaring the memory bounds of a "component", as well as registering
128/// that component for runtime memory usage tracking when using the tracking allocator implementation in `memory-accounting`.
129///
130/// ## Components
131///
132/// **Components** are any logical grouping of memory usage within a program, and they can be arbitrarily nested.
133///
134/// For example, a data plane will generally have a topology that defines the components used to accept, process, and
135/// forward data. The topology itself could be considered a component, and each individual source, transform, and
136/// destination within it could be subcomponents of the topology.
137///
138/// Components are generally meant to be tied to something that has its own memory bounds and is somewhat "standalone",
139/// but this is not an absolute requirement and components can be nested more granularly for organizational/aesthetic
140/// purposes. Again, for example, one might opt to create a component in their topology for each component type --
141/// sources, transforms, and destinations -- and then add the actual instances of those components as subcomponents to
142/// each grouping, leading to a nested structure such as `topology/sources/source1`, `topology/transforms/transform1`,
143/// and so on.
144///
145/// ## Bounds
146///
147/// Every component is able to define memory bounds for itself and its subcomponents. A builder-style API is exposed to
148/// allow for ergonomically defining these bounds -- both minimum and firm -- for components, as well as extending the
149/// nestable aspect of the registry itself to the bounds builder, allowing for flexibility in where components are
150/// defined from and how they are nested.
151///
152/// ## Allocation tracking
153///
154/// Every component is also able to be registered with its own allocation group when using the tracking allocator
155/// implementation. This is done on demand when the component's token is requested, which avoids polluting the tracking
156/// allocator with components that are never actually used, such as those used for organizational/aesthetic purposes.
157pub struct ComponentRegistry {
158    inner: Arc<Mutex<ComponentMetadata>>,
159}
160
161impl ComponentRegistry {
162    /// Gets a component by name, or creates it if it doesn't exist.
163    ///
164    /// The name provided can be given in a direct (`component_name`) or nested (`path.to.component_name`) form. If the
165    /// nested form is given, each component in the path will be created if it doesn't exist.
166    ///
167    /// Returns a `ComponentRegistry` scoped to the component.
168    pub fn get_or_create<S>(&self, name: S) -> Self
169    where
170        S: AsRef<str>,
171    {
172        let mut inner = self.inner.lock().unwrap();
173        Self {
174            inner: inner.get_or_create(name),
175        }
176    }
177
178    /// Gets a bounds builder attached to the root component.
179    pub fn bounds_builder(&mut self) -> MemoryBoundsBuilder<'_> {
180        MemoryBoundsBuilder {
181            inner: Self {
182                inner: Arc::clone(&self.inner),
183            },
184            _lt: PhantomData,
185        }
186    }
187
188    /// Gets the tracking token for the component scoped to this registry.
189    ///
190    /// If the component is the root component (has no name), the root allocation token is returned.  Otherwise, the
191    /// component is registered (using its full name) if it hasn't already been, and that token is returned.
192    pub fn token(&mut self) -> AllocationGroupToken {
193        let mut inner = self.inner.lock().unwrap();
194        inner.token()
195    }
196
197    /// Validates that all components are able to respect the calculated effective limit.
198    ///
199    /// If validation succeeds, `VerifiedBounds` is returned, which provides information about the effective limit that
200    /// can be used for allocating memory.
201    ///
202    /// ## Errors
203    ///
204    /// A number of invalid conditions are checked and will cause an error to be returned:
205    ///
206    /// - when a component has invalid bounds (e.g. minimum required bytes higher than firm limit)
207    /// - when the combined total of the firm limit for all components exceeds the effective limit
208    pub fn verify_bounds(&self, initial_grant: MemoryGrant) -> Result<VerifiedBounds, VerifierError> {
209        let bounds = self.inner.lock().unwrap().as_bounds();
210        BoundsVerifier::new(initial_grant, bounds).verify()
211    }
212
213    /// Gets an API handler for reporting the memory bounds and usage of all components.
214    ///
215    /// This handler exposes routes for querying the memory bounds and usage of all registered components. See
216    /// [`MemoryAPIHandler`] for more information about routes and responses.
217    pub fn api_handler(&self) -> MemoryAPIHandler {
218        MemoryAPIHandler::from_state(Arc::clone(&self.inner))
219    }
220
221    /// Gets the total minimum required bytes for this component and all subcomponents.
222    pub fn as_bounds(&self) -> ComponentBounds {
223        self.inner.lock().unwrap().as_bounds()
224    }
225}
226
227impl Default for ComponentRegistry {
228    fn default() -> Self {
229        Self {
230            inner: Arc::new(Mutex::new(ComponentMetadata::from_full_name(None))),
231        }
232    }
233}
234
235pub struct Minimum;
236pub struct Firm;
237
238pub(crate) mod private {
239    pub trait Sealed {}
240
241    impl Sealed for super::Minimum {}
242    impl Sealed for super::Firm {}
243}
244
245// Simple trait-based builder state approach so we can use a single builder view to modify either the minimum required
246// or firm limit amounts.
247pub trait BoundsMutator: private::Sealed {
248    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr);
249}
250
251impl BoundsMutator for Minimum {
252    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
253        bounds.self_minimum_required_bytes.push(expr)
254    }
255}
256
257impl BoundsMutator for Firm {
258    fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
259        bounds.self_firm_limit_bytes.push(expr)
260    }
261}
262
263/// Builder for defining the memory bounds of a component and its subcomponents.
264///
265/// This builder provides a simple interface for defining the minimum and firm bounds of a component, as well as
266/// declaring subcomponents. For example, a topology can contain its own "self" memory bounds, and then define the
267/// individual bounds for each component in the topology.
268pub struct MemoryBoundsBuilder<'a> {
269    inner: ComponentRegistry,
270    _lt: PhantomData<&'a ()>,
271}
272
273impl MemoryBoundsBuilder<'static> {
274    #[cfg(test)]
275    pub(crate) fn for_test() -> Self {
276        Self {
277            inner: ComponentRegistry::default(),
278            _lt: PhantomData,
279        }
280    }
281}
282
283impl MemoryBoundsBuilder<'_> {
284    /// Resets the bounds of the current component to a default state.
285    ///
286    /// This can be used in scenarios where the bounds of a component need to be redefined after they have been
287    /// specified, as not all components are able to be defined in a single pass.
288    pub fn reset(&mut self) {
289        let mut inner = self.inner.inner.lock().unwrap();
290        inner.reset();
291    }
292
293    /// Gets a builder object for defining the minimum bounds of the current component.
294    pub fn minimum(&mut self) -> BoundsBuilder<'_, Minimum> {
295        let bounds = self.inner.inner.lock().unwrap();
296        BoundsBuilder::<'_, Minimum>::new(bounds)
297    }
298
299    /// Gets a builder object for defining the firm bounds of the current component.
300    ///
301    /// The firm limit is additive with the minimum required memory, so entries that are added via `minimum` do not need
302    /// to be added again here.
303    pub fn firm(&mut self) -> BoundsBuilder<'_, Firm> {
304        let bounds = self.inner.inner.lock().unwrap();
305        BoundsBuilder::<'_, Firm>::new(bounds)
306    }
307
308    /// Creates a nested subcomponent and gets a builder object for it.
309    ///
310    /// This allows for defining the bounds of various subcomponents within a larger component, which are then rolled up
311    /// into the calculated bounds for the parent component.
312    pub fn subcomponent<S>(&mut self, name: S) -> MemoryBoundsBuilder<'_>
313    where
314        S: AsRef<str>,
315    {
316        let component = self.inner.get_or_create(name);
317        MemoryBoundsBuilder {
318            inner: component,
319            _lt: PhantomData,
320        }
321    }
322
323    /// Creates a nested subcomponent based on the given component.
324    ///
325    /// This allows for defining a subcomponent whose bounds come from an object that implements `MemoryBounds` directly.
326    pub fn with_subcomponent<S, C>(&mut self, name: S, component: &C) -> &mut Self
327    where
328        S: AsRef<str>,
329        C: MemoryBounds,
330    {
331        let mut builder = self.subcomponent(name);
332        component.specify_bounds(&mut builder);
333
334        self
335    }
336
337    #[cfg(test)]
338    pub(crate) fn as_bounds(&self) -> ComponentBounds {
339        self.inner.inner.lock().unwrap().as_bounds()
340    }
341}
342
343/// Bounds builder.
344///
345/// Helper type for defining the bounds of a component in a field-driven manner.
346pub struct BoundsBuilder<'a, S> {
347    inner: MutexGuard<'a, ComponentMetadata>,
348    _state: PhantomData<S>,
349}
350
351impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> {
352    fn new(inner: MutexGuard<'a, ComponentMetadata>) -> Self {
353        Self {
354            inner,
355            _state: PhantomData,
356        }
357    }
358
359    /// Accounts for the in-memory size of a single value.
360    ///
361    /// This is useful for tracking the expected memory usage of a single instance of a type if that type is heap
362    /// allocated. For example, components that are spawned by a topology generally end up being boxed, which means a
363    /// heap allocation exists that is the size of the component type.
364    pub fn with_single_value<T>(&mut self, name: impl Into<String>) -> &mut Self {
365        S::add_usage(&mut self.inner.bounds, UsageExpr::struct_size::<T>(name));
366        self
367    }
368
369    /// Accounts for a fixed amount of memory usage.
370    ///
371    /// This is a catch-all for directly accounting for a specific number of bytes.
372    pub fn with_fixed_amount(&mut self, name: impl Into<String>, chunk_size: usize) -> &mut Self {
373        S::add_usage(&mut self.inner.bounds, UsageExpr::constant(name, chunk_size));
374        self
375    }
376
377    /// Accounts for an item container of the given length.
378    ///
379    /// This can be used to track the expected memory usage of generalized containers like `Vec<T>`, where items are
380    /// homogenous and allocated contiguously.
381    pub fn with_array<T>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
382        S::add_usage(
383            &mut self.inner.bounds,
384            UsageExpr::product(
385                "array",
386                UsageExpr::struct_size::<T>(name),
387                UsageExpr::constant("len", len),
388            ),
389        );
390        self
391    }
392
393    /// Accounts for a map container of the given length.
394    ///
395    /// This can be used to track the expected memory usage of generalized maps like `HashMap<K, V>`, where keys and
396    /// values are
397    pub fn with_map<K, V>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
398        S::add_usage(
399            &mut self.inner.bounds,
400            UsageExpr::product(
401                "map",
402                UsageExpr::sum(
403                    name,
404                    UsageExpr::struct_size::<K>("key"),
405                    UsageExpr::struct_size::<V>("value"),
406                ),
407                UsageExpr::constant("len", len),
408            ),
409        );
410        self
411    }
412
413    pub fn with_expr(&mut self, expr: UsageExpr) -> &mut Self {
414        S::add_usage(&mut self.inner.bounds, expr);
415        self
416    }
417}