memory_accounting/registry.rs
1use std::marker::PhantomData;
2use std::{
3 collections::HashMap,
4 sync::{Arc, Mutex, MutexGuard},
5};
6
7use crate::UsageExpr;
8use crate::{
9 allocator::{AllocationGroupRegistry, AllocationGroupToken},
10 api::MemoryAPIHandler,
11 BoundsVerifier, ComponentBounds, MemoryBounds, MemoryGrant, VerifiedBounds, VerifierError,
12};
13
14pub(crate) struct ComponentMetadata {
15 full_name: Option<String>,
16 bounds: ComponentBounds,
17 token: Option<AllocationGroupToken>,
18 subcomponents: HashMap<String, Arc<Mutex<ComponentMetadata>>>,
19}
20
21impl ComponentMetadata {
22 fn from_full_name(full_name: Option<String>) -> Self {
23 Self {
24 full_name,
25 bounds: ComponentBounds::default(),
26 token: None,
27 subcomponents: HashMap::new(),
28 }
29 }
30
31 pub fn get_or_create<S>(&mut self, name: S) -> Arc<Mutex<Self>>
32 where
33 S: AsRef<str>,
34 {
35 // Split the name into the current level name and the remaining name.
36 //
37 // This lets us handle names which refer to a target nested component instead of having to chain a ton of calls
38 // together.
39 let name = name.as_ref();
40 let (current_level_name, remaining_name) = match name.split_once('.') {
41 Some((current_level_name, remaining_name)) => (current_level_name, Some(remaining_name)),
42 None => (name, None),
43 };
44
45 // Now we need to see if we have an existing component here or if we need to create a new one.
46 match self.subcomponents.get(current_level_name) {
47 Some(existing) => match remaining_name {
48 Some(remaining_name) => {
49 // We found an intermediate subcomponent, so keep recursing.
50 existing.lock().unwrap().get_or_create(remaining_name)
51 }
52 None => {
53 // We've found the leaf subcomponent.
54 Arc::clone(existing)
55 }
56 },
57 None => {
58 // We couldn't find the component at this level, so we need to create it.
59 //
60 // We do all of our name calculation and so on, but we also leave the token empty for now. We do this to
61 // avoid registering intermediate components that aren't actually used by the code, but are simply a
62 // consequence of wanting to having a nicely nested structure.
63 //
64 // We'll register a token for the component the first time it's requested.
65 let full_name = match self.full_name.as_ref() {
66 Some(parent_full_name) => format!("{}.{}", parent_full_name, current_level_name),
67 None => current_level_name.to_string(),
68 };
69
70 let inner = self
71 .subcomponents
72 .entry(current_level_name.to_string())
73 .or_insert_with(|| Arc::new(Mutex::new(Self::from_full_name(Some(full_name)))));
74
75 // If we still need to recurse further, do so here.. otherwise, return the subcomponent we just created
76 // as-is.
77 match remaining_name {
78 Some(remaining_name) => inner.lock().unwrap().get_or_create(remaining_name),
79 None => Arc::clone(inner),
80 }
81 }
82 }
83 }
84
85 fn token(&mut self) -> AllocationGroupToken {
86 match self.token {
87 Some(token) => token,
88 None => match self.full_name.as_deref() {
89 Some(full_name) => {
90 let allocator_component_registry = AllocationGroupRegistry::global();
91 let token = allocator_component_registry.register_allocation_group(full_name);
92 self.token = Some(token);
93
94 token
95 }
96 None => AllocationGroupToken::root(),
97 },
98 }
99 }
100
101 fn reset(&mut self) {
102 self.bounds = ComponentBounds::default();
103 self.subcomponents.clear();
104 }
105
106 pub fn self_bounds(&self) -> &ComponentBounds {
107 &self.bounds
108 }
109
110 pub fn as_bounds(&self) -> ComponentBounds {
111 let mut bounds = ComponentBounds::default();
112 bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes.clone();
113 bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes.clone();
114
115 for (name, subcomponent) in self.subcomponents.iter() {
116 let subcomponent = subcomponent.lock().unwrap();
117 let subcomponent_bounds = subcomponent.as_bounds();
118 bounds.subcomponents.insert(name.clone(), subcomponent_bounds);
119 }
120
121 bounds
122 }
123}
124
125/// A registry for components for tracking memory bounds and runtime memory usage.
126///
127/// This registry provides a unified interface for declaring the memory bounds of a "component", as well as registering
128/// that component for runtime memory usage tracking when using the tracking allocator implementation in `memory-accounting`.
129///
130/// ## Components
131///
132/// **Components** are any logical grouping of memory usage within a program, and they can be arbitrarily nested.
133///
134/// For example, a data plane will generally have a topology that defines the components used to accept, process, and
135/// forward data. The topology itself could be considered a component, and each individual source, transform, and
136/// destination within it could be subcomponents of the topology.
137///
138/// Components are generally meant to be tied to something that has its own memory bounds and is somewhat "standalone",
139/// but this is not an absolute requirement and components can be nested more granularly for organizational/aesthetic
140/// purposes. Again, for example, one might opt to create a component in their topology for each component type --
141/// sources, transforms, and destinations -- and then add the actual instances of those components as subcomponents to
142/// each grouping, leading to a nested structure such as `topology/sources/source1`, `topology/transforms/transform1`,
143/// and so on.
144///
145/// ## Bounds
146///
147/// Every component is able to define memory bounds for itself and its subcomponents. A builder-style API is exposed to
148/// allow for ergonomically defining these bounds -- both minimum and firm -- for components, as well as extending the
149/// nestable aspect of the registry itself to the bounds builder, allowing for flexibility in where components are
150/// defined from and how they are nested.
151///
152/// ## Allocation tracking
153///
154/// Every component is also able to be registered with its own allocation group when using the tracking allocator
155/// implementation. This is done on demand when the component's token is requested, which avoids polluting the tracking
156/// allocator with components that are never actually used, such as those used for organizational/aesthetic purposes.
157pub struct ComponentRegistry {
158 inner: Arc<Mutex<ComponentMetadata>>,
159 root: Option<Arc<Mutex<ComponentMetadata>>>,
160}
161
162impl ComponentRegistry {
163 fn get_root(&self) -> Arc<Mutex<ComponentMetadata>> {
164 match &self.root {
165 Some(root) => Arc::clone(root),
166 None => Arc::clone(&self.inner),
167 }
168 }
169
170 /// Creates a handle to this registry.
171 ///
172 /// The handle provides read-only access to root-level operations like creating API handlers and verifying bounds. It
173 /// can be freely cloned and shared.
174 pub fn root(&self) -> ComponentRegistryHandle {
175 ComponentRegistryHandle { inner: self.get_root() }
176 }
177
178 /// Gets a component by name, or creates it if it doesn't exist.
179 ///
180 /// The name provided can be given in a direct (`component_name`) or nested (`path.to.component_name`) form. If the
181 /// nested form is given, each component in the path will be created if it doesn't exist.
182 ///
183 /// Returns a `ComponentRegistry` scoped to the component.
184 pub fn get_or_create<S>(&self, name: S) -> Self
185 where
186 S: AsRef<str>,
187 {
188 let mut inner = self.inner.lock().unwrap();
189 Self {
190 inner: inner.get_or_create(name),
191 root: Some(self.get_root()),
192 }
193 }
194
195 /// Gets a bounds builder attached to the root component.
196 pub fn bounds_builder(&mut self) -> MemoryBoundsBuilder<'_> {
197 MemoryBoundsBuilder {
198 inner: Self {
199 inner: Arc::clone(&self.inner),
200 root: Some(self.get_root()),
201 },
202 _lt: PhantomData,
203 }
204 }
205
206 /// Gets the tracking token for the component scoped to this registry.
207 ///
208 /// If the component is the root component (has no name), the root allocation token is returned. Otherwise, the
209 /// component is registered (using its full name) if it hasn't already been, and that token is returned.
210 pub fn token(&mut self) -> AllocationGroupToken {
211 let mut inner = self.inner.lock().unwrap();
212 inner.token()
213 }
214
215 /// Gets the total minimum required bytes for this component and all subcomponents.
216 pub fn as_bounds(&self) -> ComponentBounds {
217 self.inner.lock().unwrap().as_bounds()
218 }
219}
220
221/// A cloneable, read-only handle to a component registry.
222///
223/// This handle provides access to read-only operations such as creating an API handler or verifying bounds. Unlike
224/// [`ComponentRegistry`], it can be freely cloned and shared across ownership boundaries.
225///
226/// Obtained via [`ComponentRegistry::root`].
227#[derive(Clone)]
228pub struct ComponentRegistryHandle {
229 inner: Arc<Mutex<ComponentMetadata>>,
230}
231
232impl ComponentRegistryHandle {
233 /// Validates that all components are able to respect the calculated effective limit.
234 ///
235 /// If validation succeeds, `VerifiedBounds` is returned, which provides information about the effective limit that
236 /// can be used for allocating memory.
237 ///
238 /// ## Errors
239 ///
240 /// A number of invalid conditions are checked and will cause an error to be returned:
241 ///
242 /// - when a component has invalid bounds (for example, minimum required bytes higher than firm limit)
243 /// - when the combined total of the firm limit for all components exceeds the effective limit
244 pub fn verify_bounds(&self, initial_grant: MemoryGrant) -> Result<VerifiedBounds, VerifierError> {
245 let bounds = self.inner.lock().unwrap().as_bounds();
246 BoundsVerifier::new(initial_grant, bounds).verify()
247 }
248
249 /// Gets an API handler for reporting the memory bounds and usage of all component in the registry.
250 ///
251 /// This handler exposes routes for querying the memory bounds and usage of all registered components. See
252 /// [`MemoryAPIHandler`] for more information about routes and responses.
253 pub fn api_handler(&self) -> MemoryAPIHandler {
254 MemoryAPIHandler::from_state(Arc::clone(&self.inner))
255 }
256
257 /// Gets the total minimum required bytes for all components in the registry.
258 ///
259 /// See [`ComponentRegistry::as_bounds`] for more details.
260 pub fn as_bounds(&self) -> ComponentBounds {
261 self.inner.lock().unwrap().as_bounds()
262 }
263}
264
265impl ComponentRegistry {
266 #[cfg(test)]
267 fn inner_ptr_eq(&self, other: &Self) -> bool {
268 Arc::ptr_eq(&self.inner, &other.inner)
269 }
270
271 #[cfg(test)]
272 fn root_ptr_eq(&self, handle: &ComponentRegistryHandle) -> bool {
273 Arc::ptr_eq(&self.get_root(), &handle.inner)
274 }
275}
276
277impl Default for ComponentRegistry {
278 fn default() -> Self {
279 Self {
280 inner: Arc::new(Mutex::new(ComponentMetadata::from_full_name(None))),
281 root: None,
282 }
283 }
284}
285
286pub struct Minimum;
287pub struct Firm;
288
289pub(crate) mod private {
290 pub trait Sealed {}
291
292 impl Sealed for super::Minimum {}
293 impl Sealed for super::Firm {}
294}
295
296// Simple trait-based builder state approach so we can use a single builder view to modify either the minimum required
297// or firm limit amounts.
298pub trait BoundsMutator: private::Sealed {
299 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr);
300}
301
302impl BoundsMutator for Minimum {
303 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
304 bounds.self_minimum_required_bytes.push(expr)
305 }
306}
307
308impl BoundsMutator for Firm {
309 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
310 bounds.self_firm_limit_bytes.push(expr)
311 }
312}
313
314/// Builder for defining the memory bounds of a component and its subcomponents.
315///
316/// This builder provides a simple interface for defining the minimum and firm bounds of a component, as well as
317/// declaring subcomponents. For example, a topology can contain its own "self" memory bounds, and then define the
318/// individual bounds for each component in the topology.
319pub struct MemoryBoundsBuilder<'a> {
320 inner: ComponentRegistry,
321 _lt: PhantomData<&'a ()>,
322}
323
324impl MemoryBoundsBuilder<'static> {
325 #[cfg(test)]
326 pub(crate) fn for_test() -> Self {
327 Self {
328 inner: ComponentRegistry::default(),
329 _lt: PhantomData,
330 }
331 }
332}
333
334impl MemoryBoundsBuilder<'_> {
335 /// Resets the bounds of the current component to a default state.
336 ///
337 /// This can be used in scenarios where the bounds of a component need to be redefined after they have been
338 /// specified, as not all components are able to be defined in a single pass.
339 pub fn reset(&mut self) {
340 let mut inner = self.inner.inner.lock().unwrap();
341 inner.reset();
342 }
343
344 /// Gets a builder object for defining the minimum bounds of the current component.
345 pub fn minimum(&mut self) -> BoundsBuilder<'_, Minimum> {
346 let bounds = self.inner.inner.lock().unwrap();
347 BoundsBuilder::<'_, Minimum>::new(bounds)
348 }
349
350 /// Gets a builder object for defining the firm bounds of the current component.
351 ///
352 /// The firm limit is additive with the minimum required memory, so entries that are added via `minimum` do not need
353 /// to be added again here.
354 pub fn firm(&mut self) -> BoundsBuilder<'_, Firm> {
355 let bounds = self.inner.inner.lock().unwrap();
356 BoundsBuilder::<'_, Firm>::new(bounds)
357 }
358
359 /// Creates a nested subcomponent and gets a builder object for it.
360 ///
361 /// This allows for defining the bounds of various subcomponents within a larger component, which are then rolled up
362 /// into the calculated bounds for the parent component.
363 pub fn subcomponent<S>(&mut self, name: S) -> MemoryBoundsBuilder<'_>
364 where
365 S: AsRef<str>,
366 {
367 let component = self.inner.get_or_create(name);
368 MemoryBoundsBuilder {
369 inner: component,
370 _lt: PhantomData,
371 }
372 }
373
374 /// Creates a nested subcomponent based on the given component.
375 ///
376 /// This allows for defining a subcomponent whose bounds come from an object that implements `MemoryBounds` directly.
377 pub fn with_subcomponent<S, C>(&mut self, name: S, component: &C) -> &mut Self
378 where
379 S: AsRef<str>,
380 C: MemoryBounds,
381 {
382 let mut builder = self.subcomponent(name);
383 component.specify_bounds(&mut builder);
384
385 self
386 }
387
388 #[cfg(test)]
389 pub(crate) fn as_bounds(&self) -> ComponentBounds {
390 self.inner.inner.lock().unwrap().as_bounds()
391 }
392}
393
394/// Bounds builder.
395///
396/// Helper type for defining the bounds of a component in a field-driven manner.
397pub struct BoundsBuilder<'a, S> {
398 inner: MutexGuard<'a, ComponentMetadata>,
399 _state: PhantomData<S>,
400}
401
402impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> {
403 fn new(inner: MutexGuard<'a, ComponentMetadata>) -> Self {
404 Self {
405 inner,
406 _state: PhantomData,
407 }
408 }
409
410 /// Accounts for the in-memory size of a single value.
411 ///
412 /// This is useful for tracking the expected memory usage of a single instance of a type if that type is heap
413 /// allocated. For example, components that are spawned by a topology generally end up being boxed, which means a
414 /// heap allocation exists that is the size of the component type.
415 pub fn with_single_value<T>(&mut self, name: impl Into<String>) -> &mut Self {
416 S::add_usage(&mut self.inner.bounds, UsageExpr::struct_size::<T>(name));
417 self
418 }
419
420 /// Accounts for a fixed amount of memory usage.
421 ///
422 /// This is a catch-all for directly accounting for a specific number of bytes.
423 pub fn with_fixed_amount(&mut self, name: impl Into<String>, chunk_size: usize) -> &mut Self {
424 S::add_usage(&mut self.inner.bounds, UsageExpr::constant(name, chunk_size));
425 self
426 }
427
428 /// Accounts for an item container of the given length.
429 ///
430 /// This can be used to track the expected memory usage of generalized containers like `Vec<T>`, where items are
431 /// homogeneous and allocated contiguously.
432 pub fn with_array<T>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
433 S::add_usage(
434 &mut self.inner.bounds,
435 UsageExpr::product(
436 "array",
437 UsageExpr::struct_size::<T>(name),
438 UsageExpr::constant("len", len),
439 ),
440 );
441 self
442 }
443
444 /// Accounts for a map container of the given length.
445 ///
446 /// This can be used to track the expected memory usage of generalized maps like `HashMap<K, V>`, where keys and
447 /// values are
448 pub fn with_map<K, V>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
449 S::add_usage(
450 &mut self.inner.bounds,
451 UsageExpr::product(
452 "map",
453 UsageExpr::sum(
454 name,
455 UsageExpr::struct_size::<K>("key"),
456 UsageExpr::struct_size::<V>("value"),
457 ),
458 UsageExpr::constant("len", len),
459 ),
460 );
461 self
462 }
463
464 pub fn with_expr(&mut self, expr: UsageExpr) -> &mut Self {
465 S::add_usage(&mut self.inner.bounds, expr);
466 self
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn root_handle_from_root_registry_points_to_root() {
476 let registry = ComponentRegistry::default();
477 let handle = registry.root();
478
479 assert!(registry.root_ptr_eq(&handle));
480 }
481
482 #[test]
483 fn root_handle_from_subcomponent_points_to_root() {
484 let registry = ComponentRegistry::default();
485 let child = registry.get_or_create("child");
486
487 let handle = child.root();
488
489 assert!(registry.root_ptr_eq(&handle));
490 assert!(!child.inner_ptr_eq(®istry));
491 }
492
493 #[test]
494 fn root_handle_from_deeply_nested_subcomponent_points_to_root() {
495 let registry = ComponentRegistry::default();
496 let grandchild = registry.get_or_create("child").get_or_create("grandchild");
497
498 let handle = grandchild.root();
499
500 assert!(registry.root_ptr_eq(&handle));
501 }
502
503 #[test]
504 fn root_handle_from_dotted_path_subcomponent_points_to_root() {
505 let registry = ComponentRegistry::default();
506 let nested = registry.get_or_create("a.b.c");
507
508 let handle = nested.root();
509
510 assert!(registry.root_ptr_eq(&handle));
511 }
512
513 #[test]
514 fn cloned_handle_points_to_root() {
515 let registry = ComponentRegistry::default();
516 let child = registry.get_or_create("child");
517
518 let handle = child.root();
519 let cloned = handle.clone();
520
521 assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
522 assert!(registry.root_ptr_eq(&cloned));
523 }
524}