saluki_core/pooling/
fixed.rs1use std::{
2 collections::VecDeque,
3 future::Future,
4 pin::Pin,
5 sync::{Arc, Mutex},
6 task::{ready, Context, Poll},
7};
8
9use pin_project::pin_project;
10use tokio::sync::Semaphore;
11use tokio_util::sync::PollSemaphore;
12
13use super::{Clearable, ObjectPool, PoolMetrics, Poolable, ReclaimStrategy};
14
15pub struct FixedSizeObjectPool<T: Poolable> {
26 strategy: Arc<FixedSizeStrategy<T>>,
27}
28
29impl<T: Poolable> FixedSizeObjectPool<T>
30where
31 T::Data: Default,
32{
33 pub fn with_capacity<S>(pool_name: S, capacity: usize) -> Self
37 where
38 S: Into<String>,
39 {
40 Self {
41 strategy: Arc::new(FixedSizeStrategy::new(pool_name, capacity)),
42 }
43 }
44}
45
46impl<T: Poolable> FixedSizeObjectPool<T> {
47 pub fn with_builder<S, B>(pool_name: S, capacity: usize, builder: B) -> Self
53 where
54 S: Into<String>,
55 B: Fn() -> T::Data,
56 {
57 Self {
58 strategy: Arc::new(FixedSizeStrategy::with_builder(pool_name, capacity, builder)),
59 }
60 }
61}
62
63impl<T: Poolable> Clone for FixedSizeObjectPool<T> {
64 fn clone(&self) -> Self {
65 Self {
66 strategy: self.strategy.clone(),
67 }
68 }
69}
70
71impl<T> ObjectPool for FixedSizeObjectPool<T>
72where
73 T: Poolable + Send + Unpin + 'static,
74{
75 type Item = T;
76 type AcquireFuture = FixedSizeAcquireFuture<T>;
77
78 fn acquire(&self) -> Self::AcquireFuture {
79 FixedSizeStrategy::acquire(&self.strategy)
80 }
81}
82
83struct FixedSizeStrategy<T: Poolable> {
84 items: Mutex<VecDeque<T::Data>>,
85 available: Arc<Semaphore>,
86 metrics: PoolMetrics,
87}
88
89impl<T> FixedSizeStrategy<T>
90where
91 T: Poolable,
92 T::Data: Default,
93{
94 fn new<S>(pool_name: S, capacity: usize) -> Self
95 where
96 S: Into<String>,
97 {
98 let mut items = VecDeque::with_capacity(capacity);
99 items.extend((0..capacity).map(|_| T::Data::default()));
100 let available = Arc::new(Semaphore::new(capacity));
101
102 Self {
103 items: Mutex::new(items),
104 available,
105 metrics: PoolMetrics::new(pool_name.into()),
106 }
107 }
108}
109
110impl<T: Poolable> FixedSizeStrategy<T> {
111 fn with_builder<S, B>(pool_name: S, capacity: usize, builder: B) -> Self
112 where
113 S: Into<String>,
114 B: Fn() -> T::Data,
115 {
116 let mut items = VecDeque::with_capacity(capacity);
117 items.extend((0..capacity).map(|_| builder()));
118 let available = Arc::new(Semaphore::new(capacity));
119
120 let metrics = PoolMetrics::new(pool_name.into());
121 metrics.created().increment(capacity as u64);
122 metrics.capacity().set(capacity as f64);
123
124 Self {
125 items: Mutex::new(items),
126 available,
127 metrics,
128 }
129 }
130}
131
132impl<T> FixedSizeStrategy<T>
133where
134 T: Poolable,
135 T::Data: Send + 'static,
136{
137 fn acquire(strategy: &Arc<Self>) -> FixedSizeAcquireFuture<T> {
138 FixedSizeAcquireFuture::new(Arc::clone(strategy))
139 }
140}
141
142impl<T: Poolable> ReclaimStrategy<T> for FixedSizeStrategy<T> {
143 fn reclaim(&self, mut data: T::Data) {
144 data.clear();
145
146 self.items.lock().unwrap().push_back(data);
147 self.available.add_permits(1);
148 self.metrics.released().increment(1);
149 self.metrics.in_use().decrement(1.0);
150 }
151}
152
153#[pin_project]
154pub struct FixedSizeAcquireFuture<T: Poolable> {
155 strategy: Option<Arc<FixedSizeStrategy<T>>>,
156 semaphore: PollSemaphore,
157}
158
159impl<T: Poolable> FixedSizeAcquireFuture<T> {
160 fn new(strategy: Arc<FixedSizeStrategy<T>>) -> Self {
161 let semaphore = PollSemaphore::new(Arc::clone(&strategy.available));
162 Self {
163 strategy: Some(strategy),
164 semaphore,
165 }
166 }
167}
168
169impl<T> Future for FixedSizeAcquireFuture<T>
170where
171 T: Poolable + 'static,
172{
173 type Output = T;
174
175 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 let this = self.project();
177
178 match ready!(this.semaphore.poll_acquire(cx)) {
179 Some(permit) => {
180 permit.forget();
181
182 let strategy = this.strategy.take().unwrap();
183 let data = strategy.items.lock().unwrap().pop_back().unwrap();
184 strategy.metrics.acquired().increment(1);
185 strategy.metrics.in_use().increment(1.0);
186 Poll::Ready(T::from_data(strategy, data))
187 }
188 None => unreachable!("semaphore should never be closed"),
189 }
190 }
191}