1use std::{
2 collections::VecDeque,
3 future::Future,
4 pin::Pin,
5 sync::{
6 atomic::{
7 AtomicUsize,
8 Ordering::{AcqRel, Acquire, Relaxed},
9 },
10 Arc, Mutex,
11 },
12 task::{Context, Poll},
13 time::Duration,
14};
15
16use memory_accounting::allocator::AllocationGroupToken;
17use pin_project::pin_project;
18use tokio::{
19 sync::{OwnedSemaphorePermit, Semaphore},
20 time::sleep,
21};
22use tokio_util::sync::PollSemaphore;
23use tracing::{debug, trace};
24
25use super::{Clearable, ObjectPool, PoolMetrics, Poolable, ReclaimStrategy};
26
27const SHRINKER_SLEEP_DURATION: Duration = Duration::from_secs(1);
28
29pub struct ElasticObjectPool<T: Poolable> {
43 strategy: Arc<ElasticStrategy<T>>,
44}
45
46impl<T> ElasticObjectPool<T>
47where
48 T: Poolable + 'static,
49 T::Data: Default,
50{
51 pub fn with_capacity<S>(pool_name: S, min_capacity: usize, max_capacity: usize) -> (Self, impl Future<Output = ()>)
53 where
54 S: Into<String>,
55 {
56 Self::with_builder(pool_name, min_capacity, max_capacity, T::Data::default)
57 }
58}
59
60impl<T> ElasticObjectPool<T>
61where
62 T: Poolable + 'static,
63{
64 pub fn with_builder<S, B>(
68 pool_name: S, min_capacity: usize, max_capacity: usize, builder: B,
69 ) -> (Self, impl Future<Output = ()>)
70 where
71 S: Into<String>,
72 B: Fn() -> T::Data + Send + Sync + 'static,
73 {
74 let strategy = Arc::new(ElasticStrategy::with_builder(
75 pool_name,
76 min_capacity,
77 max_capacity,
78 builder,
79 ));
80 let shrinker = run_background_shrinker(Arc::clone(&strategy));
81
82 (Self { strategy }, shrinker)
83 }
84}
85
86impl<T: Poolable> Clone for ElasticObjectPool<T> {
87 fn clone(&self) -> Self {
88 Self {
89 strategy: self.strategy.clone(),
90 }
91 }
92}
93
94impl<T> ObjectPool for ElasticObjectPool<T>
95where
96 T: Poolable + Send + Unpin + 'static,
97{
98 type Item = T;
99 type AcquireFuture = ElasticAcquireFuture<T>;
100
101 fn acquire(&self) -> Self::AcquireFuture {
102 ElasticStrategy::acquire(&self.strategy)
103 }
104}
105
106struct ElasticStrategy<T: Poolable> {
107 items: Mutex<VecDeque<T::Data>>,
108 builder: Box<dyn Fn() -> T::Data + Send + Sync>,
109 available: Arc<Semaphore>,
110 active: AtomicUsize,
111 on_demand_allocs: AtomicUsize,
112 min_capacity: usize,
113 max_capacity: usize,
114 alloc_group: AllocationGroupToken,
115 metrics: PoolMetrics,
116}
117
118impl<T: Poolable> ElasticStrategy<T> {
119 fn with_builder<S, B>(pool_name: S, min_capacity: usize, max_capacity: usize, builder: B) -> Self
120 where
121 S: Into<String>,
122 B: Fn() -> T::Data + Send + Sync + 'static,
123 {
124 let builder = Box::new(builder);
125
126 let mut items = VecDeque::with_capacity(max_capacity);
128 items.extend((0..min_capacity).map(|_| builder()));
129 let available = Arc::new(Semaphore::new(min_capacity));
130
131 let metrics = PoolMetrics::new(pool_name.into());
132 metrics.capacity().set(min_capacity as f64);
133 metrics.created().increment(min_capacity as u64);
134
135 Self {
136 items: Mutex::new(items),
137 builder,
138 available,
139 active: AtomicUsize::new(min_capacity),
140 on_demand_allocs: AtomicUsize::new(0),
141 min_capacity,
142 max_capacity,
143 alloc_group: AllocationGroupToken::current(),
144 metrics,
145 }
146 }
147
148 fn acquire_item(&self, permit: OwnedSemaphorePermit) -> T::Data {
149 permit.forget();
150
151 let data = { self.items.lock().unwrap().pop_back().unwrap() };
152
153 self.metrics.acquired().increment(1);
154 self.metrics.in_use().increment(1.0);
155
156 data
157 }
158}
159
160impl<T> ElasticStrategy<T>
161where
162 T: Poolable,
163 T::Data: Send + 'static,
164{
165 fn acquire(strategy: &Arc<Self>) -> ElasticAcquireFuture<T> {
166 ElasticAcquireFuture::new(Arc::clone(strategy))
167 }
168}
169
170impl<T: Poolable> ReclaimStrategy<T> for ElasticStrategy<T> {
171 fn reclaim(&self, mut data: T::Data) {
172 data.clear();
173
174 self.items.lock().unwrap().push_back(data);
175 self.available.add_permits(1);
176 self.metrics.released().increment(1);
177 self.metrics.in_use().decrement(1.0);
178 }
179}
180
181#[pin_project]
183pub struct ElasticAcquireFuture<T: Poolable> {
184 strategy: Option<Arc<ElasticStrategy<T>>>,
185 waiting_slow: bool,
186 semaphore: PollSemaphore,
187}
188
189impl<T: Poolable> ElasticAcquireFuture<T> {
190 fn new(strategy: Arc<ElasticStrategy<T>>) -> Self {
191 let semaphore = PollSemaphore::new(Arc::clone(&strategy.available));
192 Self {
193 strategy: Some(strategy),
194 waiting_slow: false,
195 semaphore,
196 }
197 }
198}
199
200impl<T> Future for ElasticAcquireFuture<T>
201where
202 T: Poolable + 'static,
203{
204 type Output = T;
205
206 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207 let this = self.project();
208 let strategy = this.strategy.take().unwrap();
209
210 while !*this.waiting_slow {
216 if let Ok(permit) = strategy.available.clone().try_acquire_owned() {
221 trace!("Acquired permit on fast path. Acquiring item from pool.");
222
223 let data = strategy.acquire_item(permit);
224
225 return Poll::Ready(T::from_data(strategy, data));
226 }
227
228 trace!("No available permits. Attempting to allocate on demand.");
229
230 let active = strategy.active.load(Acquire);
232 if active == strategy.max_capacity {
233 trace!("Pool at capacity. Falling back to waiting for next available permit.");
234
235 *this.waiting_slow = true;
236 break;
237 }
238
239 if strategy
242 .active
243 .compare_exchange_weak(active, active + 1, AcqRel, Relaxed)
244 .is_err()
245 {
246 continue;
247 }
248
249 trace!("Updated active count. Allocating on demand.");
250
251 let new_item = {
252 let _entered = strategy.alloc_group.enter();
253 (strategy.builder)()
254 };
255
256 strategy.on_demand_allocs.fetch_add(1, Relaxed);
257 strategy.metrics.created().increment(1);
258 strategy.metrics.capacity().increment(1.0);
259 strategy.metrics.in_use().increment(1.0);
260
261 return Poll::Ready(T::from_data(strategy, new_item));
262 }
263
264 trace!("Waiting for next available permit.");
265 match this.semaphore.poll_acquire(cx) {
266 Poll::Ready(Some(permit)) => {
267 trace!("Acquired permit. Acquiring item from pool.");
268
269 let data = strategy.acquire_item(permit);
270
271 Poll::Ready(T::from_data(strategy, data))
272 }
273 Poll::Ready(None) => unreachable!("semaphore should never be closed"),
274 Poll::Pending => {
275 trace!("Permit not yet available. Waiting for next available permit.");
276
277 this.strategy.replace(strategy);
278 Poll::Pending
279 }
280 }
281 }
282}
283
284async fn run_background_shrinker<T: Poolable>(strategy: Arc<ElasticStrategy<T>>) {
285 let mut average_demand = Ewma::new(0.1);
298 let min_capacity = strategy.min_capacity;
299
300 loop {
301 debug!("Shrinker sleeping.");
302 sleep(SHRINKER_SLEEP_DURATION).await;
303
304 let active = strategy.active.load(Relaxed);
309 if active <= min_capacity {
310 debug!("Object pool already at minimum capacity. Nothing to do.");
311 continue;
312 }
313
314 let delta_demand = strategy.on_demand_allocs.swap(0, Relaxed);
315 average_demand.update(delta_demand as f64);
316 if average_demand.value() < 1.0 {
317 debug!(
318 avg_demand = average_demand.value(),
319 active, min_capacity, "Pool qualifies for shrinking. Attempting to remove single item..."
320 );
321
322 let permit = strategy
324 .available
325 .acquire()
326 .await
327 .expect("semaphore should never be closed");
328
329 let item = {
331 let item = strategy.items.lock().unwrap().pop_back().unwrap();
332 strategy.active.fetch_sub(1, AcqRel);
333 item
334 };
335
336 drop(item);
338 strategy.metrics.deleted().increment(1);
339 strategy.metrics.capacity().decrement(1.0);
340
341 permit.forget();
343
344 debug!("Shrinker successfully removed an item from the pool.");
345 } else {
346 debug!(
347 avg_demand = average_demand.value(),
348 active, min_capacity, "Pool does not qualify for shrinking."
349 );
350 }
351 }
352}
353
354struct Ewma {
355 value: f64,
356 alpha: f64,
357}
358
359impl Ewma {
360 fn new(alpha: f64) -> Self {
361 Self { value: 0.0, alpha }
362 }
363
364 fn update(&mut self, new_value: f64) {
365 self.value = (1.0 - self.alpha) * self.value + self.alpha * new_value;
366 }
367
368 fn value(&self) -> f64 {
369 self.value
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use tokio_test::{assert_pending, assert_ready, task::spawn};
376
377 use super::ElasticObjectPool;
378 use crate::{pooled, pooling::ObjectPool as _};
379
380 pooled! {
381 struct TestObject {
382 value: u32,
383 }
384
385 clear => |this| this.value = 0
386 }
387
388 impl std::fmt::Debug for TestObject {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 f.debug_struct("TestObject").finish_non_exhaustive()
391 }
392 }
393
394 #[test]
395 fn basic() {
396 let (pool, _) = ElasticObjectPool::<TestObject>::with_capacity("test", 1, 2);
397 assert_eq!(pool.strategy.available.available_permits(), 1);
398
399 let mut acquire = spawn(pool.acquire());
400 let item = assert_ready!(acquire.poll());
401 assert_eq!(pool.strategy.available.available_permits(), 0);
402
403 drop(item);
404 assert_eq!(pool.strategy.available.available_permits(), 1);
405 }
406
407 #[test]
408 fn burst_allocation() {
409 let (pool, _) = ElasticObjectPool::<TestObject>::with_capacity("test", 1, 2);
410 assert_eq!(pool.strategy.available.available_permits(), 1);
411
412 let mut first_acquire = spawn(pool.acquire());
414 let first_item = assert_ready!(first_acquire.poll());
415 assert_eq!(pool.strategy.available.available_permits(), 0);
416
417 let mut second_acquire = spawn(pool.acquire());
419 let second_item = assert_ready!(second_acquire.poll());
420 assert_eq!(pool.strategy.available.available_permits(), 0);
421
422 let mut third_acquire = spawn(pool.acquire());
424 assert_pending!(third_acquire.poll());
425 assert!(!third_acquire.is_woken());
426
427 drop(first_item);
430 assert_eq!(pool.strategy.available.available_permits(), 0);
431 drop(second_item);
432 assert_eq!(pool.strategy.available.available_permits(), 1);
433
434 assert!(third_acquire.is_woken());
436 let third_item = assert_ready!(third_acquire.poll());
437 assert_eq!(pool.strategy.available.available_permits(), 1);
438
439 drop(third_item);
440 assert_eq!(pool.strategy.available.available_permits(), 2);
441 }
442}