resource_accounting/registry.rs
1use std::marker::PhantomData;
2use std::{
3 collections::HashMap,
4 sync::{Arc, Mutex, MutexGuard},
5};
6
7use crate::UsageExpr;
8use crate::{
9 api::ResourceAPIHandler, BoundsVerifier, ComponentBounds, MemoryBounds, MemoryGrant, ResourceGroupRegistry,
10 ResourceGroupToken, VerifiedBounds, VerifierError,
11};
12
13pub(crate) struct ComponentMetadata {
14 full_name: Option<String>,
15 bounds: ComponentBounds,
16 token: Option<ResourceGroupToken>,
17 subcomponents: HashMap<String, Arc<Mutex<ComponentMetadata>>>,
18}
19
20impl ComponentMetadata {
21 fn from_full_name(full_name: Option<String>) -> Self {
22 Self {
23 full_name,
24 bounds: ComponentBounds::default(),
25 token: None,
26 subcomponents: HashMap::new(),
27 }
28 }
29
30 pub fn get_or_create<S>(&mut self, name: S) -> Arc<Mutex<Self>>
31 where
32 S: AsRef<str>,
33 {
34 // Split the name into the current level name and the remaining name.
35 //
36 // This lets us handle names which refer to a target nested component instead of having to chain a ton of calls
37 // together.
38 let name = name.as_ref();
39 let (current_level_name, remaining_name) = match name.split_once('.') {
40 Some((current_level_name, remaining_name)) => (current_level_name, Some(remaining_name)),
41 None => (name, None),
42 };
43
44 // Now we need to see if we have an existing component here or if we need to create a new one.
45 match self.subcomponents.get(current_level_name) {
46 Some(existing) => match remaining_name {
47 Some(remaining_name) => {
48 // We found an intermediate subcomponent, so keep recursing.
49 existing.lock().unwrap().get_or_create(remaining_name)
50 }
51 None => {
52 // We've found the leaf subcomponent.
53 Arc::clone(existing)
54 }
55 },
56 None => {
57 // We couldn't find the component at this level, so we need to create it.
58 //
59 // We do all of our name calculation and so on, but we also leave the token empty for now. We do this to
60 // avoid registering intermediate components that aren't actually used by the code, but are simply a
61 // consequence of wanting to having a nicely nested structure.
62 //
63 // We'll register a token for the component the first time it's requested.
64 let full_name = match self.full_name.as_ref() {
65 Some(parent_full_name) => format!("{}.{}", parent_full_name, current_level_name),
66 None => current_level_name.to_string(),
67 };
68
69 let inner = self
70 .subcomponents
71 .entry(current_level_name.to_string())
72 .or_insert_with(|| Arc::new(Mutex::new(Self::from_full_name(Some(full_name)))));
73
74 // If we still need to recurse further, do so here.. otherwise, return the subcomponent we just created
75 // as-is.
76 match remaining_name {
77 Some(remaining_name) => inner.lock().unwrap().get_or_create(remaining_name),
78 None => Arc::clone(inner),
79 }
80 }
81 }
82 }
83
84 fn token(&mut self) -> ResourceGroupToken {
85 match self.token {
86 Some(token) => token,
87 None => match self.full_name.as_deref() {
88 Some(full_name) => {
89 let allocator_component_registry = ResourceGroupRegistry::global();
90 let token = allocator_component_registry.register_resource_group(full_name);
91 self.token = Some(token);
92
93 token
94 }
95 None => ResourceGroupToken::root(),
96 },
97 }
98 }
99
100 fn reset(&mut self) {
101 self.bounds = ComponentBounds::default();
102 self.subcomponents.clear();
103 }
104
105 pub fn self_bounds(&self) -> &ComponentBounds {
106 &self.bounds
107 }
108
109 pub fn as_bounds(&self) -> ComponentBounds {
110 let mut bounds = ComponentBounds::default();
111 bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes.clone();
112 bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes.clone();
113
114 for (name, subcomponent) in self.subcomponents.iter() {
115 let subcomponent = subcomponent.lock().unwrap();
116 let subcomponent_bounds = subcomponent.as_bounds();
117 bounds.subcomponents.insert(name.clone(), subcomponent_bounds);
118 }
119
120 bounds
121 }
122}
123
124/// A registry for components for tracking memory bounds and runtime memory usage.
125///
126/// This registry provides a unified interface for declaring the memory bounds of a _component_, as well as registering
127/// that component for runtime memory usage tracking when using the tracking allocator implementation in `resource-accounting`.
128///
129/// ## Components
130///
131/// **Components** are any logical grouping of memory usage within a program, and they can be arbitrarily nested.
132///
133/// For example, a data plane will generally have a topology that defines the components used to accept, process, and
134/// forward data. The topology itself could be considered a component, and each individual source, transform, and
135/// destination within it could be subcomponents of the topology.
136///
137/// Components are generally meant to be tied to something that has its own memory bounds and is somewhat standalone,
138/// but this isn't an absolute requirement and components can be nested more granularly for organizational/aesthetic
139/// purposes. Again, for example, one might opt to create a component in their topology for each component type --
140/// sources, transforms, and destinations -- and then add the actual instances of those components as subcomponents to
141/// each grouping, leading to a nested structure such as `topology/sources/source1`, `topology/transforms/transform1`,
142/// and so on.
143///
144/// ## Bounds
145///
146/// Every component is able to define memory bounds for itself and its subcomponents. A builder-style API is exposed to
147/// allow for ergonomically defining these bounds -- both minimum and firm -- for components, as well as extending the
148/// nestable aspect of the registry itself to the bounds builder, allowing for flexibility in where components are
149/// defined from and how they're nested.
150///
151/// ## Allocation tracking
152///
153/// Every component is also able to be registered with its own resource group when using the tracking allocator
154/// implementation. This is done on demand when the component's token is requested, which avoids polluting the tracking
155/// allocator with components that are never actually used, such as those used for organizational/aesthetic purposes.
156pub struct ComponentRegistry {
157 inner: Arc<Mutex<ComponentMetadata>>,
158 root: Option<Arc<Mutex<ComponentMetadata>>>,
159}
160
161impl ComponentRegistry {
162 fn get_root(&self) -> Arc<Mutex<ComponentMetadata>> {
163 match &self.root {
164 Some(root) => Arc::clone(root),
165 None => Arc::clone(&self.inner),
166 }
167 }
168
169 /// Creates a handle to this registry.
170 ///
171 /// The handle provides read-only access to root-level operations like creating API handlers and verifying bounds. It
172 /// can be freely cloned and shared.
173 pub fn root(&self) -> ComponentRegistryHandle {
174 ComponentRegistryHandle { inner: self.get_root() }
175 }
176
177 /// Gets a component by name, or creates it if it doesn't exist.
178 ///
179 /// The name provided can be given in a direct (`component_name`) or nested (`path.to.component_name`) form. If the
180 /// nested form is given, each component in the path will be created if it doesn't exist.
181 ///
182 /// Returns a `ComponentRegistry` scoped to the component.
183 pub fn get_or_create<S>(&self, name: S) -> Self
184 where
185 S: AsRef<str>,
186 {
187 let mut inner = self.inner.lock().unwrap();
188 Self {
189 inner: inner.get_or_create(name),
190 root: Some(self.get_root()),
191 }
192 }
193
194 /// Gets a bounds builder attached to the root component.
195 pub fn bounds_builder(&mut self) -> MemoryBoundsBuilder<'_> {
196 MemoryBoundsBuilder {
197 inner: Self {
198 inner: Arc::clone(&self.inner),
199 root: Some(self.get_root()),
200 },
201 _lt: PhantomData,
202 }
203 }
204
205 /// Gets the tracking token for the component scoped to this registry.
206 ///
207 /// If the component is the root component (has no name), the root allocation token is returned. Otherwise, the
208 /// component is registered (using its full name) if it hasn't already been, and that token is returned.
209 pub fn token(&mut self) -> ResourceGroupToken {
210 let mut inner = self.inner.lock().unwrap();
211 inner.token()
212 }
213
214 /// Gets the total minimum required bytes for this component and all subcomponents.
215 pub fn as_bounds(&self) -> ComponentBounds {
216 self.inner.lock().unwrap().as_bounds()
217 }
218}
219
220/// A cloneable, read-only handle to a component registry.
221///
222/// This handle provides access to read-only operations such as creating an API handler or verifying bounds. Unlike
223/// [`ComponentRegistry`], it can be freely cloned and shared across ownership boundaries.
224///
225/// Obtained via [`ComponentRegistry::root`].
226#[derive(Clone)]
227pub struct ComponentRegistryHandle {
228 inner: Arc<Mutex<ComponentMetadata>>,
229}
230
231impl ComponentRegistryHandle {
232 /// Validates that all components are able to respect the calculated effective limit.
233 ///
234 /// If validation succeeds, `VerifiedBounds` is returned, which provides information about the effective limit that
235 /// can be used for allocating memory.
236 ///
237 /// ## Errors
238 ///
239 /// A number of invalid conditions are checked and will cause an error to be returned:
240 ///
241 /// - when a component has invalid bounds (for example, minimum required bytes higher than firm limit)
242 /// - when the combined total of the firm limit for all components exceeds the effective limit
243 pub fn verify_bounds(&self, initial_grant: MemoryGrant) -> Result<VerifiedBounds, VerifierError> {
244 let bounds = self.inner.lock().unwrap().as_bounds();
245 BoundsVerifier::new(initial_grant, bounds).verify()
246 }
247
248 /// Gets an API handler for reporting the memory bounds and resource usage of all component in the registry.
249 ///
250 /// This handler exposes routes for querying the memory bounds and usage of all registered components. See
251 /// [`ResourceAPIHandler`] for more information about routes and responses.
252 pub fn api_handler(&self) -> ResourceAPIHandler {
253 ResourceAPIHandler::from_state(Arc::clone(&self.inner))
254 }
255
256 /// Gets the total minimum required bytes for all components in the registry.
257 ///
258 /// See [`ComponentRegistry::as_bounds`] for more details.
259 pub fn as_bounds(&self) -> ComponentBounds {
260 self.inner.lock().unwrap().as_bounds()
261 }
262}
263
264impl ComponentRegistry {
265 #[cfg(test)]
266 fn inner_ptr_eq(&self, other: &Self) -> bool {
267 Arc::ptr_eq(&self.inner, &other.inner)
268 }
269
270 #[cfg(test)]
271 fn root_ptr_eq(&self, handle: &ComponentRegistryHandle) -> bool {
272 Arc::ptr_eq(&self.get_root(), &handle.inner)
273 }
274}
275
276impl Default for ComponentRegistry {
277 fn default() -> Self {
278 Self {
279 inner: Arc::new(Mutex::new(ComponentMetadata::from_full_name(None))),
280 root: None,
281 }
282 }
283}
284
285pub struct Minimum;
286pub struct Firm;
287
288pub(crate) mod private {
289 pub trait Sealed {}
290
291 impl Sealed for super::Minimum {}
292 impl Sealed for super::Firm {}
293}
294
295// Simple trait-based builder state approach so we can use a single builder view to modify either the minimum required
296// or firm limit amounts.
297pub trait BoundsMutator: private::Sealed {
298 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr);
299}
300
301impl BoundsMutator for Minimum {
302 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
303 bounds.self_minimum_required_bytes.push(expr)
304 }
305}
306
307impl BoundsMutator for Firm {
308 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
309 bounds.self_firm_limit_bytes.push(expr)
310 }
311}
312
313/// Builder for defining the memory bounds of a component and its subcomponents.
314///
315/// This builder provides a simple interface for defining the minimum and firm bounds of a component, as well as
316/// declaring subcomponents. For example, a topology can contain its own "self" memory bounds, and then define the
317/// individual bounds for each component in the topology.
318pub struct MemoryBoundsBuilder<'a> {
319 inner: ComponentRegistry,
320 _lt: PhantomData<&'a ()>,
321}
322
323impl MemoryBoundsBuilder<'static> {
324 #[cfg(test)]
325 pub(crate) fn for_test() -> Self {
326 Self {
327 inner: ComponentRegistry::default(),
328 _lt: PhantomData,
329 }
330 }
331}
332
333impl MemoryBoundsBuilder<'_> {
334 /// Resets the bounds of the current component to a default state.
335 ///
336 /// This can be used in scenarios where the bounds of a component need to be redefined after they have been
337 /// specified, as not all components are able to be defined in a single pass.
338 pub fn reset(&mut self) {
339 let mut inner = self.inner.inner.lock().unwrap();
340 inner.reset();
341 }
342
343 /// Gets a builder object for defining the minimum bounds of the current component.
344 pub fn minimum(&mut self) -> BoundsBuilder<'_, Minimum> {
345 let bounds = self.inner.inner.lock().unwrap();
346 BoundsBuilder::<'_, Minimum>::new(bounds)
347 }
348
349 /// Gets a builder object for defining the firm bounds of the current component.
350 ///
351 /// The firm limit is additive with the minimum required memory, so entries that are added via `minimum` don't need
352 /// to be added again here.
353 pub fn firm(&mut self) -> BoundsBuilder<'_, Firm> {
354 let bounds = self.inner.inner.lock().unwrap();
355 BoundsBuilder::<'_, Firm>::new(bounds)
356 }
357
358 /// Creates a nested subcomponent and gets a builder object for it.
359 ///
360 /// This allows for defining the bounds of various subcomponents within a larger component, which are then rolled up
361 /// into the calculated bounds for the parent component.
362 pub fn subcomponent<S>(&mut self, name: S) -> MemoryBoundsBuilder<'_>
363 where
364 S: AsRef<str>,
365 {
366 let component = self.inner.get_or_create(name);
367 MemoryBoundsBuilder {
368 inner: component,
369 _lt: PhantomData,
370 }
371 }
372
373 /// Creates a nested subcomponent based on the given component.
374 ///
375 /// This allows for defining a subcomponent whose bounds come from an object that implements `MemoryBounds` directly.
376 pub fn with_subcomponent<S, C>(&mut self, name: S, component: &C) -> &mut Self
377 where
378 S: AsRef<str>,
379 C: MemoryBounds,
380 {
381 let mut builder = self.subcomponent(name);
382 component.specify_bounds(&mut builder);
383
384 self
385 }
386
387 #[cfg(test)]
388 pub(crate) fn as_bounds(&self) -> ComponentBounds {
389 self.inner.inner.lock().unwrap().as_bounds()
390 }
391}
392
393/// Bounds builder.
394///
395/// Helper type for defining the bounds of a component in a field-driven manner.
396pub struct BoundsBuilder<'a, S> {
397 inner: MutexGuard<'a, ComponentMetadata>,
398 _state: PhantomData<S>,
399}
400
401impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> {
402 fn new(inner: MutexGuard<'a, ComponentMetadata>) -> Self {
403 Self {
404 inner,
405 _state: PhantomData,
406 }
407 }
408
409 /// Accounts for the in-memory size of a single value.
410 ///
411 /// This is useful for tracking the expected memory usage of a single instance of a type if that type is heap
412 /// allocated. For example, components that are spawned by a topology generally end up being boxed, which means a
413 /// heap allocation exists that's the size of the component type.
414 pub fn with_single_value<T>(&mut self, name: impl Into<String>) -> &mut Self {
415 S::add_usage(&mut self.inner.bounds, UsageExpr::struct_size::<T>(name));
416 self
417 }
418
419 /// Accounts for a fixed amount of memory usage.
420 ///
421 /// This is a catch-all for directly accounting for a specific number of bytes.
422 pub fn with_fixed_amount(&mut self, name: impl Into<String>, chunk_size: usize) -> &mut Self {
423 S::add_usage(&mut self.inner.bounds, UsageExpr::constant(name, chunk_size));
424 self
425 }
426
427 /// Accounts for an item container of the given length.
428 ///
429 /// This can be used to track the expected memory usage of generalized containers like `Vec<T>`, where items are
430 /// homogeneous and allocated contiguously.
431 pub fn with_array<T>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
432 S::add_usage(
433 &mut self.inner.bounds,
434 UsageExpr::product(
435 "array",
436 UsageExpr::struct_size::<T>(name),
437 UsageExpr::constant("len", len),
438 ),
439 );
440 self
441 }
442
443 /// Accounts for a map container of the given length.
444 ///
445 /// This can be used to track the expected memory usage of generalized maps like `HashMap<K, V>`, where keys and
446 /// values are
447 pub fn with_map<K, V>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
448 S::add_usage(
449 &mut self.inner.bounds,
450 UsageExpr::product(
451 "map",
452 UsageExpr::sum(
453 name,
454 UsageExpr::struct_size::<K>("key"),
455 UsageExpr::struct_size::<V>("value"),
456 ),
457 UsageExpr::constant("len", len),
458 ),
459 );
460 self
461 }
462
463 pub fn with_expr(&mut self, expr: UsageExpr) -> &mut Self {
464 S::add_usage(&mut self.inner.bounds, expr);
465 self
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472
473 #[test]
474 fn root_handle_from_root_registry_points_to_root() {
475 let registry = ComponentRegistry::default();
476 let handle = registry.root();
477
478 assert!(registry.root_ptr_eq(&handle));
479 }
480
481 #[test]
482 fn root_handle_from_subcomponent_points_to_root() {
483 let registry = ComponentRegistry::default();
484 let child = registry.get_or_create("child");
485
486 let handle = child.root();
487
488 assert!(registry.root_ptr_eq(&handle));
489 assert!(!child.inner_ptr_eq(®istry));
490 }
491
492 #[test]
493 fn root_handle_from_deeply_nested_subcomponent_points_to_root() {
494 let registry = ComponentRegistry::default();
495 let grandchild = registry.get_or_create("child").get_or_create("grandchild");
496
497 let handle = grandchild.root();
498
499 assert!(registry.root_ptr_eq(&handle));
500 }
501
502 #[test]
503 fn root_handle_from_dotted_path_subcomponent_points_to_root() {
504 let registry = ComponentRegistry::default();
505 let nested = registry.get_or_create("a.b.c");
506
507 let handle = nested.root();
508
509 assert!(registry.root_ptr_eq(&handle));
510 }
511
512 #[test]
513 fn cloned_handle_points_to_root() {
514 let registry = ComponentRegistry::default();
515 let child = registry.get_or_create("child");
516
517 let handle = child.root();
518 let cloned = handle.clone();
519
520 assert!(Arc::ptr_eq(&handle.inner, &cloned.inner));
521 assert!(registry.root_ptr_eq(&cloned));
522 }
523}