memory_accounting/registry.rs
1use std::marker::PhantomData;
2use std::{
3 collections::HashMap,
4 sync::{Arc, Mutex, MutexGuard},
5};
6
7use crate::UsageExpr;
8use crate::{
9 allocator::{AllocationGroupRegistry, AllocationGroupToken},
10 api::MemoryAPIHandler,
11 BoundsVerifier, ComponentBounds, MemoryBounds, MemoryGrant, VerifiedBounds, VerifierError,
12};
13
14pub(crate) struct ComponentMetadata {
15 full_name: Option<String>,
16 bounds: ComponentBounds,
17 token: Option<AllocationGroupToken>,
18 subcomponents: HashMap<String, Arc<Mutex<ComponentMetadata>>>,
19}
20
21impl ComponentMetadata {
22 fn from_full_name(full_name: Option<String>) -> Self {
23 Self {
24 full_name,
25 bounds: ComponentBounds::default(),
26 token: None,
27 subcomponents: HashMap::new(),
28 }
29 }
30
31 pub fn get_or_create<S>(&mut self, name: S) -> Arc<Mutex<Self>>
32 where
33 S: AsRef<str>,
34 {
35 // Split the name into the current level name and the remaining name.
36 //
37 // This lets us handle names which refer to a target nested component instead of having to chain a ton of calls
38 // together.
39 let name = name.as_ref();
40 let (current_level_name, remaining_name) = match name.split_once('.') {
41 Some((current_level_name, remaining_name)) => (current_level_name, Some(remaining_name)),
42 None => (name, None),
43 };
44
45 // Now we need to see if we have an existing component here or if we need to create a new one.
46 match self.subcomponents.get(current_level_name) {
47 Some(existing) => match remaining_name {
48 Some(remaining_name) => {
49 // We found an intermediate subcomponent, so keep recursing.
50 existing.lock().unwrap().get_or_create(remaining_name)
51 }
52 None => {
53 // We've found the leaf subcomponent.
54 Arc::clone(existing)
55 }
56 },
57 None => {
58 // We couldn't find the component at this level, so we need to create it.
59 //
60 // We do all of our name calculation and so on, but we also leave the token empty for now. We do this to
61 // avoid registering intermediate components that aren't actually used by the code, but are simply a
62 // consequence of wanting to having a nicely nested structure.
63 //
64 // We'll register a token for the component the first time it's requested.
65 let full_name = match self.full_name.as_ref() {
66 Some(parent_full_name) => format!("{}.{}", parent_full_name, current_level_name),
67 None => current_level_name.to_string(),
68 };
69
70 let inner = self
71 .subcomponents
72 .entry(current_level_name.to_string())
73 .or_insert_with(|| Arc::new(Mutex::new(Self::from_full_name(Some(full_name)))));
74
75 // If we still need to recurse further, do so here.. otherwise, return the subcomponent we just created
76 // as-is.
77 match remaining_name {
78 Some(remaining_name) => inner.lock().unwrap().get_or_create(remaining_name),
79 None => Arc::clone(inner),
80 }
81 }
82 }
83 }
84
85 fn token(&mut self) -> AllocationGroupToken {
86 match self.token {
87 Some(token) => token,
88 None => match self.full_name.as_deref() {
89 Some(full_name) => {
90 let allocator_component_registry = AllocationGroupRegistry::global();
91 let token = allocator_component_registry.register_allocation_group(full_name);
92 self.token = Some(token);
93
94 token
95 }
96 None => AllocationGroupToken::root(),
97 },
98 }
99 }
100
101 fn reset(&mut self) {
102 self.bounds = ComponentBounds::default();
103 self.subcomponents.clear();
104 }
105
106 pub fn self_bounds(&self) -> &ComponentBounds {
107 &self.bounds
108 }
109
110 pub fn as_bounds(&self) -> ComponentBounds {
111 let mut bounds = ComponentBounds::default();
112 bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes.clone();
113 bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes.clone();
114
115 for (name, subcomponent) in self.subcomponents.iter() {
116 let subcomponent = subcomponent.lock().unwrap();
117 let subcomponent_bounds = subcomponent.as_bounds();
118 bounds.subcomponents.insert(name.clone(), subcomponent_bounds);
119 }
120
121 bounds
122 }
123}
124
125/// A registry for components for tracking memory bounds and runtime memory usage.
126///
127/// This registry provides a unified interface for declaring the memory bounds of a "component", as well as registering
128/// that component for runtime memory usage tracking when using the tracking allocator implementation in `memory-accounting`.
129///
130/// ## Components
131///
132/// **Components** are any logical grouping of memory usage within a program, and they can be arbitrarily nested.
133///
134/// For example, a data plane will generally have a topology that defines the components used to accept, process, and
135/// forward data. The topology itself could be considered a component, and each individual source, transform, and
136/// destination within it could be subcomponents of the topology.
137///
138/// Components are generally meant to be tied to something that has its own memory bounds and is somewhat "standalone",
139/// but this is not an absolute requirement and components can be nested more granularly for organizational/aesthetic
140/// purposes. Again, for example, one might opt to create a component in their topology for each component type --
141/// sources, transforms, and destinations -- and then add the actual instances of those components as subcomponents to
142/// each grouping, leading to a nested structure such as `topology/sources/source1`, `topology/transforms/transform1`,
143/// and so on.
144///
145/// ## Bounds
146///
147/// Every component is able to define memory bounds for itself and its subcomponents. A builder-style API is exposed to
148/// allow for ergonomically defining these bounds -- both minimum and firm -- for components, as well as extending the
149/// nestable aspect of the registry itself to the bounds builder, allowing for flexibility in where components are
150/// defined from and how they are nested.
151///
152/// ## Allocation tracking
153///
154/// Every component is also able to be registered with its own allocation group when using the tracking allocator
155/// implementation. This is done on demand when the component's token is requested, which avoids polluting the tracking
156/// allocator with components that are never actually used, such as those used for organizational/aesthetic purposes.
157pub struct ComponentRegistry {
158 inner: Arc<Mutex<ComponentMetadata>>,
159}
160
161impl ComponentRegistry {
162 /// Gets a component by name, or creates it if it doesn't exist.
163 ///
164 /// The name provided can be given in a direct (`component_name`) or nested (`path.to.component_name`) form. If the
165 /// nested form is given, each component in the path will be created if it doesn't exist.
166 ///
167 /// Returns a `ComponentRegistry` scoped to the component.
168 pub fn get_or_create<S>(&self, name: S) -> Self
169 where
170 S: AsRef<str>,
171 {
172 let mut inner = self.inner.lock().unwrap();
173 Self {
174 inner: inner.get_or_create(name),
175 }
176 }
177
178 /// Gets a bounds builder attached to the root component.
179 pub fn bounds_builder(&mut self) -> MemoryBoundsBuilder<'_> {
180 MemoryBoundsBuilder {
181 inner: Self {
182 inner: Arc::clone(&self.inner),
183 },
184 _lt: PhantomData,
185 }
186 }
187
188 /// Gets the tracking token for the component scoped to this registry.
189 ///
190 /// If the component is the root component (has no name), the root allocation token is returned. Otherwise, the
191 /// component is registered (using its full name) if it hasn't already been, and that token is returned.
192 pub fn token(&mut self) -> AllocationGroupToken {
193 let mut inner = self.inner.lock().unwrap();
194 inner.token()
195 }
196
197 /// Validates that all components are able to respect the calculated effective limit.
198 ///
199 /// If validation succeeds, `VerifiedBounds` is returned, which provides information about the effective limit that
200 /// can be used for allocating memory.
201 ///
202 /// ## Errors
203 ///
204 /// A number of invalid conditions are checked and will cause an error to be returned:
205 ///
206 /// - when a component has invalid bounds (e.g. minimum required bytes higher than firm limit)
207 /// - when the combined total of the firm limit for all components exceeds the effective limit
208 pub fn verify_bounds(&self, initial_grant: MemoryGrant) -> Result<VerifiedBounds, VerifierError> {
209 let bounds = self.inner.lock().unwrap().as_bounds();
210 BoundsVerifier::new(initial_grant, bounds).verify()
211 }
212
213 /// Gets an API handler for reporting the memory bounds and usage of all components.
214 ///
215 /// This handler exposes routes for querying the memory bounds and usage of all registered components. See
216 /// [`MemoryAPIHandler`] for more information about routes and responses.
217 pub fn api_handler(&self) -> MemoryAPIHandler {
218 MemoryAPIHandler::from_state(Arc::clone(&self.inner))
219 }
220
221 /// Gets the total minimum required bytes for this component and all subcomponents.
222 pub fn as_bounds(&self) -> ComponentBounds {
223 self.inner.lock().unwrap().as_bounds()
224 }
225}
226
227impl Default for ComponentRegistry {
228 fn default() -> Self {
229 Self {
230 inner: Arc::new(Mutex::new(ComponentMetadata::from_full_name(None))),
231 }
232 }
233}
234
235pub struct Minimum;
236pub struct Firm;
237
238pub(crate) mod private {
239 pub trait Sealed {}
240
241 impl Sealed for super::Minimum {}
242 impl Sealed for super::Firm {}
243}
244
245// Simple trait-based builder state approach so we can use a single builder view to modify either the minimum required
246// or firm limit amounts.
247pub trait BoundsMutator: private::Sealed {
248 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr);
249}
250
251impl BoundsMutator for Minimum {
252 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
253 bounds.self_minimum_required_bytes.push(expr)
254 }
255}
256
257impl BoundsMutator for Firm {
258 fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) {
259 bounds.self_firm_limit_bytes.push(expr)
260 }
261}
262
263/// Builder for defining the memory bounds of a component and its subcomponents.
264///
265/// This builder provides a simple interface for defining the minimum and firm bounds of a component, as well as
266/// declaring subcomponents. For example, a topology can contain its own "self" memory bounds, and then define the
267/// individual bounds for each component in the topology.
268pub struct MemoryBoundsBuilder<'a> {
269 inner: ComponentRegistry,
270 _lt: PhantomData<&'a ()>,
271}
272
273impl MemoryBoundsBuilder<'static> {
274 #[cfg(test)]
275 pub(crate) fn for_test() -> Self {
276 Self {
277 inner: ComponentRegistry::default(),
278 _lt: PhantomData,
279 }
280 }
281}
282
283impl MemoryBoundsBuilder<'_> {
284 /// Resets the bounds of the current component to a default state.
285 ///
286 /// This can be used in scenarios where the bounds of a component need to be redefined after they have been
287 /// specified, as not all components are able to be defined in a single pass.
288 pub fn reset(&mut self) {
289 let mut inner = self.inner.inner.lock().unwrap();
290 inner.reset();
291 }
292
293 /// Gets a builder object for defining the minimum bounds of the current component.
294 pub fn minimum(&mut self) -> BoundsBuilder<'_, Minimum> {
295 let bounds = self.inner.inner.lock().unwrap();
296 BoundsBuilder::<'_, Minimum>::new(bounds)
297 }
298
299 /// Gets a builder object for defining the firm bounds of the current component.
300 ///
301 /// The firm limit is additive with the minimum required memory, so entries that are added via `minimum` do not need
302 /// to be added again here.
303 pub fn firm(&mut self) -> BoundsBuilder<'_, Firm> {
304 let bounds = self.inner.inner.lock().unwrap();
305 BoundsBuilder::<'_, Firm>::new(bounds)
306 }
307
308 /// Creates a nested subcomponent and gets a builder object for it.
309 ///
310 /// This allows for defining the bounds of various subcomponents within a larger component, which are then rolled up
311 /// into the calculated bounds for the parent component.
312 pub fn subcomponent<S>(&mut self, name: S) -> MemoryBoundsBuilder<'_>
313 where
314 S: AsRef<str>,
315 {
316 let component = self.inner.get_or_create(name);
317 MemoryBoundsBuilder {
318 inner: component,
319 _lt: PhantomData,
320 }
321 }
322
323 /// Creates a nested subcomponent based on the given component.
324 ///
325 /// This allows for defining a subcomponent whose bounds come from an object that implements `MemoryBounds` directly.
326 pub fn with_subcomponent<S, C>(&mut self, name: S, component: &C) -> &mut Self
327 where
328 S: AsRef<str>,
329 C: MemoryBounds,
330 {
331 let mut builder = self.subcomponent(name);
332 component.specify_bounds(&mut builder);
333
334 self
335 }
336
337 #[cfg(test)]
338 pub(crate) fn as_bounds(&self) -> ComponentBounds {
339 self.inner.inner.lock().unwrap().as_bounds()
340 }
341}
342
343/// Bounds builder.
344///
345/// Helper type for defining the bounds of a component in a field-driven manner.
346pub struct BoundsBuilder<'a, S> {
347 inner: MutexGuard<'a, ComponentMetadata>,
348 _state: PhantomData<S>,
349}
350
351impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> {
352 fn new(inner: MutexGuard<'a, ComponentMetadata>) -> Self {
353 Self {
354 inner,
355 _state: PhantomData,
356 }
357 }
358
359 /// Accounts for the in-memory size of a single value.
360 ///
361 /// This is useful for tracking the expected memory usage of a single instance of a type if that type is heap
362 /// allocated. For example, components that are spawned by a topology generally end up being boxed, which means a
363 /// heap allocation exists that is the size of the component type.
364 pub fn with_single_value<T>(&mut self, name: impl Into<String>) -> &mut Self {
365 S::add_usage(&mut self.inner.bounds, UsageExpr::struct_size::<T>(name));
366 self
367 }
368
369 /// Accounts for a fixed amount of memory usage.
370 ///
371 /// This is a catch-all for directly accounting for a specific number of bytes.
372 pub fn with_fixed_amount(&mut self, name: impl Into<String>, chunk_size: usize) -> &mut Self {
373 S::add_usage(&mut self.inner.bounds, UsageExpr::constant(name, chunk_size));
374 self
375 }
376
377 /// Accounts for an item container of the given length.
378 ///
379 /// This can be used to track the expected memory usage of generalized containers like `Vec<T>`, where items are
380 /// homogenous and allocated contiguously.
381 pub fn with_array<T>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
382 S::add_usage(
383 &mut self.inner.bounds,
384 UsageExpr::product(
385 "array",
386 UsageExpr::struct_size::<T>(name),
387 UsageExpr::constant("len", len),
388 ),
389 );
390 self
391 }
392
393 /// Accounts for a map container of the given length.
394 ///
395 /// This can be used to track the expected memory usage of generalized maps like `HashMap<K, V>`, where keys and
396 /// values are
397 pub fn with_map<K, V>(&mut self, name: impl Into<String>, len: usize) -> &mut Self {
398 S::add_usage(
399 &mut self.inner.bounds,
400 UsageExpr::product(
401 "map",
402 UsageExpr::sum(
403 name,
404 UsageExpr::struct_size::<K>("key"),
405 UsageExpr::struct_size::<V>("value"),
406 ),
407 UsageExpr::constant("len", len),
408 ),
409 );
410 self
411 }
412
413 pub fn with_expr(&mut self, expr: UsageExpr) -> &mut Self {
414 S::add_usage(&mut self.inner.bounds, expr);
415 self
416 }
417}