saluki_core/pooling/
on_demand.rs1use std::{
2 future::{ready, Ready},
3 sync::Arc,
4};
5
6use memory_accounting::allocator::AllocationGroupToken;
7
8use super::{Clearable, ObjectPool, PoolMetrics, Poolable, ReclaimStrategy};
9
10pub struct OnDemandObjectPool<T: Poolable> {
14 strategy: Arc<OnDemandStrategy<T>>,
15}
16
17impl<T> OnDemandObjectPool<T>
18where
19 T: Poolable + 'static,
20 T::Data: Default,
21{
22 pub fn new<S>(pool_name: S) -> Self
24 where
25 S: Into<String>,
26 {
27 Self::with_builder(pool_name, T::Data::default)
28 }
29}
30
31impl<T> OnDemandObjectPool<T>
32where
33 T: Poolable + 'static,
34{
35 pub fn with_builder<S, B>(pool_name: S, builder: B) -> Self
39 where
40 S: Into<String>,
41 B: Fn() -> T::Data + Send + Sync + 'static,
42 {
43 let strategy = Arc::new(OnDemandStrategy::with_builder(pool_name, builder));
44
45 Self { strategy }
46 }
47}
48
49impl<T: Poolable> Clone for OnDemandObjectPool<T> {
50 fn clone(&self) -> Self {
51 Self {
52 strategy: self.strategy.clone(),
53 }
54 }
55}
56
57impl<T> ObjectPool for OnDemandObjectPool<T>
58where
59 T: Poolable + Send + Unpin + 'static,
60{
61 type Item = T;
62 type AcquireFuture = Ready<T>;
63
64 fn acquire(&self) -> Self::AcquireFuture {
65 let strategy = Arc::clone(&self.strategy);
66 let item = strategy.build();
67 ready(T::from_data(strategy, item))
68 }
69}
70
71struct OnDemandStrategy<T: Poolable> {
72 builder: Box<dyn Fn() -> T::Data + Send + Sync>,
73 alloc_group: AllocationGroupToken,
74 metrics: PoolMetrics,
75}
76
77impl<T: Poolable> OnDemandStrategy<T> {
78 fn with_builder<S, B>(pool_name: S, builder: B) -> Self
79 where
80 S: Into<String>,
81 B: Fn() -> T::Data + Send + Sync + 'static,
82 {
83 let builder = Box::new(builder);
84
85 let metrics = PoolMetrics::new(pool_name.into());
86 metrics.capacity().set(usize::MAX as f64);
87
88 Self {
89 builder,
90 alloc_group: AllocationGroupToken::current(),
91 metrics,
92 }
93 }
94
95 fn build(&self) -> T::Data {
96 self.metrics.created().increment(1);
97 self.metrics.in_use().increment(1.0);
98
99 let _ = self.alloc_group.enter();
100 (self.builder)()
101 }
102}
103
104impl<T: Poolable> ReclaimStrategy<T> for OnDemandStrategy<T> {
105 fn reclaim(&self, mut data: T::Data) {
106 data.clear();
107 drop(data);
108
109 self.metrics.released().increment(1);
110 self.metrics.in_use().decrement(1.0);
111 }
112}