saluki_core/pooling/
on_demand.rs

1use std::{
2    future::{ready, Ready},
3    sync::Arc,
4};
5
6use memory_accounting::allocator::AllocationGroupToken;
7
8use super::{Clearable, ObjectPool, PoolMetrics, Poolable, ReclaimStrategy};
9
10/// An object pool that allocates objects on demand.
11///
12/// This pool implementation is meant to satisfy the interface of [`ObjectPool`] without performing any actual pooling.
13pub struct OnDemandObjectPool<T: Poolable> {
14    strategy: Arc<OnDemandStrategy<T>>,
15}
16
17impl<T> OnDemandObjectPool<T>
18where
19    T: Poolable + 'static,
20    T::Data: Default,
21{
22    /// Creates a new `OnDemandObjectPool`.
23    pub fn new<S>(pool_name: S) -> Self
24    where
25        S: Into<String>,
26    {
27        Self::with_builder(pool_name, T::Data::default)
28    }
29}
30
31impl<T> OnDemandObjectPool<T>
32where
33    T: Poolable + 'static,
34{
35    /// Creates a new `OnDemandObjectPool` with the given item builder.
36    ///
37    /// `builder` is called to construct each item.
38    pub fn with_builder<S, B>(pool_name: S, builder: B) -> Self
39    where
40        S: Into<String>,
41        B: Fn() -> T::Data + Send + Sync + 'static,
42    {
43        let strategy = Arc::new(OnDemandStrategy::with_builder(pool_name, builder));
44
45        Self { strategy }
46    }
47}
48
49impl<T: Poolable> Clone for OnDemandObjectPool<T> {
50    fn clone(&self) -> Self {
51        Self {
52            strategy: self.strategy.clone(),
53        }
54    }
55}
56
57impl<T> ObjectPool for OnDemandObjectPool<T>
58where
59    T: Poolable + Send + Unpin + 'static,
60{
61    type Item = T;
62    type AcquireFuture = Ready<T>;
63
64    fn acquire(&self) -> Self::AcquireFuture {
65        let strategy = Arc::clone(&self.strategy);
66        let item = strategy.build();
67        ready(T::from_data(strategy, item))
68    }
69}
70
71struct OnDemandStrategy<T: Poolable> {
72    builder: Box<dyn Fn() -> T::Data + Send + Sync>,
73    alloc_group: AllocationGroupToken,
74    metrics: PoolMetrics,
75}
76
77impl<T: Poolable> OnDemandStrategy<T> {
78    fn with_builder<S, B>(pool_name: S, builder: B) -> Self
79    where
80        S: Into<String>,
81        B: Fn() -> T::Data + Send + Sync + 'static,
82    {
83        let builder = Box::new(builder);
84
85        let metrics = PoolMetrics::new(pool_name.into());
86        metrics.capacity().set(usize::MAX as f64);
87
88        Self {
89            builder,
90            alloc_group: AllocationGroupToken::current(),
91            metrics,
92        }
93    }
94
95    fn build(&self) -> T::Data {
96        self.metrics.created().increment(1);
97        self.metrics.in_use().increment(1.0);
98
99        let _ = self.alloc_group.enter();
100        (self.builder)()
101    }
102}
103
104impl<T: Poolable> ReclaimStrategy<T> for OnDemandStrategy<T> {
105    fn reclaim(&self, mut data: T::Data) {
106        data.clear();
107        drop(data);
108
109        self.metrics.released().increment(1);
110        self.metrics.in_use().decrement(1.0);
111    }
112}