saluki_core/pooling/
elastic.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    pin::Pin,
5    sync::{
6        atomic::{
7            AtomicUsize,
8            Ordering::{AcqRel, Acquire, Relaxed},
9        },
10        Arc, Mutex,
11    },
12    task::{Context, Poll},
13    time::Duration,
14};
15
16use memory_accounting::allocator::AllocationGroupToken;
17use pin_project::pin_project;
18use tokio::{
19    sync::{OwnedSemaphorePermit, Semaphore},
20    time::sleep,
21};
22use tokio_util::sync::PollSemaphore;
23use tracing::{debug, trace};
24
25use super::{Clearable, ObjectPool, PoolMetrics, Poolable, ReclaimStrategy};
26
27const SHRINKER_SLEEP_DURATION: Duration = Duration::from_secs(1);
28
29/// An elastic object pool.
30///
31/// Pools are configured with a minimum and maximum size, and allocate the minimum number of items up front. When an
32/// item is requested and the pool is empty, but has not yet reached its maximum size, it will allocate the item on
33/// demand.
34///
35/// Periodically, a background task will evaluate the utilization of the pool and shrink the pool size in order to
36/// attempt to size it more closely to the recent demand. The frequency of this shrinking, as well as how usage demand
37/// is captured and rolled off, is configurable.
38///
39/// # Missing
40///
41/// - Actual configurability around the shrinking frequency and usage demand roll-off.
42pub struct ElasticObjectPool<T: Poolable> {
43    strategy: Arc<ElasticStrategy<T>>,
44}
45
46impl<T> ElasticObjectPool<T>
47where
48    T: Poolable + 'static,
49    T::Data: Default,
50{
51    /// Creates a new `ElasticObjectPool` with the given minimum and maximum capacity.
52    pub fn with_capacity<S>(pool_name: S, min_capacity: usize, max_capacity: usize) -> (Self, impl Future<Output = ()>)
53    where
54        S: Into<String>,
55    {
56        Self::with_builder(pool_name, min_capacity, max_capacity, T::Data::default)
57    }
58}
59
60impl<T> ElasticObjectPool<T>
61where
62    T: Poolable + 'static,
63{
64    /// Creates a new `ElasticObjectPool` with the given minimum and maximum capacity and item builder.
65    ///
66    /// `builder` is called to construct each item.
67    pub fn with_builder<S, B>(
68        pool_name: S, min_capacity: usize, max_capacity: usize, builder: B,
69    ) -> (Self, impl Future<Output = ()>)
70    where
71        S: Into<String>,
72        B: Fn() -> T::Data + Send + Sync + 'static,
73    {
74        let strategy = Arc::new(ElasticStrategy::with_builder(
75            pool_name,
76            min_capacity,
77            max_capacity,
78            builder,
79        ));
80        let shrinker = run_background_shrinker(Arc::clone(&strategy));
81
82        (Self { strategy }, shrinker)
83    }
84}
85
86impl<T: Poolable> Clone for ElasticObjectPool<T> {
87    fn clone(&self) -> Self {
88        Self {
89            strategy: self.strategy.clone(),
90        }
91    }
92}
93
94impl<T> ObjectPool for ElasticObjectPool<T>
95where
96    T: Poolable + Send + Unpin + 'static,
97{
98    type Item = T;
99    type AcquireFuture = ElasticAcquireFuture<T>;
100
101    fn acquire(&self) -> Self::AcquireFuture {
102        ElasticStrategy::acquire(&self.strategy)
103    }
104}
105
106struct ElasticStrategy<T: Poolable> {
107    items: Mutex<VecDeque<T::Data>>,
108    builder: Box<dyn Fn() -> T::Data + Send + Sync>,
109    available: Arc<Semaphore>,
110    active: AtomicUsize,
111    on_demand_allocs: AtomicUsize,
112    min_capacity: usize,
113    max_capacity: usize,
114    alloc_group: AllocationGroupToken,
115    metrics: PoolMetrics,
116}
117
118impl<T: Poolable> ElasticStrategy<T> {
119    fn with_builder<S, B>(pool_name: S, min_capacity: usize, max_capacity: usize, builder: B) -> Self
120    where
121        S: Into<String>,
122        B: Fn() -> T::Data + Send + Sync + 'static,
123    {
124        let builder = Box::new(builder);
125
126        // Allocate enough storage to hold the maximum number of items, but only _build_ the minimum number of items.
127        let mut items = VecDeque::with_capacity(max_capacity);
128        items.extend((0..min_capacity).map(|_| builder()));
129        let available = Arc::new(Semaphore::new(min_capacity));
130
131        let metrics = PoolMetrics::new(pool_name.into());
132        metrics.capacity().set(min_capacity as f64);
133        metrics.created().increment(min_capacity as u64);
134
135        Self {
136            items: Mutex::new(items),
137            builder,
138            available,
139            active: AtomicUsize::new(min_capacity),
140            on_demand_allocs: AtomicUsize::new(0),
141            min_capacity,
142            max_capacity,
143            alloc_group: AllocationGroupToken::current(),
144            metrics,
145        }
146    }
147
148    fn acquire_item(&self, permit: OwnedSemaphorePermit) -> T::Data {
149        permit.forget();
150
151        let data = { self.items.lock().unwrap().pop_back().unwrap() };
152
153        self.metrics.acquired().increment(1);
154        self.metrics.in_use().increment(1.0);
155
156        data
157    }
158}
159
160impl<T> ElasticStrategy<T>
161where
162    T: Poolable,
163    T::Data: Send + 'static,
164{
165    fn acquire(strategy: &Arc<Self>) -> ElasticAcquireFuture<T> {
166        ElasticAcquireFuture::new(Arc::clone(strategy))
167    }
168}
169
170impl<T: Poolable> ReclaimStrategy<T> for ElasticStrategy<T> {
171    fn reclaim(&self, mut data: T::Data) {
172        data.clear();
173
174        self.items.lock().unwrap().push_back(data);
175        self.available.add_permits(1);
176        self.metrics.released().increment(1);
177        self.metrics.in_use().decrement(1.0);
178    }
179}
180
181/// A [`Future`] that acquires an item from an [`ElasticObjectPool`].
182#[pin_project]
183pub struct ElasticAcquireFuture<T: Poolable> {
184    strategy: Option<Arc<ElasticStrategy<T>>>,
185    waiting_slow: bool,
186    semaphore: PollSemaphore,
187}
188
189impl<T: Poolable> ElasticAcquireFuture<T> {
190    fn new(strategy: Arc<ElasticStrategy<T>>) -> Self {
191        let semaphore = PollSemaphore::new(Arc::clone(&strategy.available));
192        Self {
193            strategy: Some(strategy),
194            waiting_slow: false,
195            semaphore,
196        }
197    }
198}
199
200impl<T> Future for ElasticAcquireFuture<T>
201where
202    T: Poolable + 'static,
203{
204    type Output = T;
205
206    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
207        let this = self.project();
208        let strategy = this.strategy.take().unwrap();
209
210        // If we're not waiting from previously going down the slow path, we'll try to acquire a permit in a non-polling
211        // fashion, which avoids scheduling a wakeup for this task if we end up trying to allocate an item on demand.
212        //
213        // If we _are_ waiting from previously going down the slow path, we'll always poll the semaphore to ensure we
214        // don't throw away a permit that was given to us while we were waiting.
215        while !*this.waiting_slow {
216            // Fast path: try to acquire a permit immediately.
217            //
218            // If we get one, we can acquire an item from the pool. Otherwise, we'll attempt to do a burst allocation if
219            // the pool hasn't yet reached its maximum capacity.
220            if let Ok(permit) = strategy.available.clone().try_acquire_owned() {
221                trace!("Acquired permit on fast path. Acquiring item from pool.");
222
223                let data = strategy.acquire_item(permit);
224
225                return Poll::Ready(T::from_data(strategy, data));
226            }
227
228            trace!("No available permits. Attempting to allocate on demand.");
229
230            // If we're at capacity, we can't allocate any more items.
231            let active = strategy.active.load(Acquire);
232            if active == strategy.max_capacity {
233                trace!("Pool at capacity. Falling back to waiting for next available permit.");
234
235                *this.waiting_slow = true;
236                break;
237            }
238
239            // Try to atomically increment `active` which signals that we still have capacity to allocate another
240            // item, and more importantly, that _we_ are authorized to do so.
241            if strategy
242                .active
243                .compare_exchange_weak(active, active + 1, AcqRel, Relaxed)
244                .is_err()
245            {
246                continue;
247            }
248
249            trace!("Updated active count. Allocating on demand.");
250
251            let new_item = {
252                let _entered = strategy.alloc_group.enter();
253                (strategy.builder)()
254            };
255
256            strategy.on_demand_allocs.fetch_add(1, Relaxed);
257            strategy.metrics.created().increment(1);
258            strategy.metrics.capacity().increment(1.0);
259            strategy.metrics.in_use().increment(1.0);
260
261            return Poll::Ready(T::from_data(strategy, new_item));
262        }
263
264        trace!("Waiting for next available permit.");
265        match this.semaphore.poll_acquire(cx) {
266            Poll::Ready(Some(permit)) => {
267                trace!("Acquired permit. Acquiring item from pool.");
268
269                let data = strategy.acquire_item(permit);
270
271                Poll::Ready(T::from_data(strategy, data))
272            }
273            Poll::Ready(None) => unreachable!("semaphore should never be closed"),
274            Poll::Pending => {
275                trace!("Permit not yet available. Waiting for next available permit.");
276
277                this.strategy.replace(strategy);
278                Poll::Pending
279            }
280        }
281    }
282}
283
284async fn run_background_shrinker<T: Poolable>(strategy: Arc<ElasticStrategy<T>>) {
285    // The shrinker continuously tracks the "demand" for items in the pool, and attempts to shrink the pool size such
286    // that it stays at the smallest possible size while minimizing the amount of on-demand allocations seen.
287    //
288    // Every time the shrinker runs, it consumes the current value of `fallback_count`, which gives us a delta of the
289    // number of on-demand allocations that have occurred since the last time the shrinker ran, or simply "demand". We
290    // track a rolling average of this demand, and if the average demand is less than N, where N is configurable, then
291    // we consider the pool eligible to shrink.
292    //
293    // We only shrink the pool down to its minimum capacity, even if the average demand is less than N. When the pool is
294    // eligible to shrink and not yet at the minimum capacity, we remove a single item from the pool per iteration.
295
296    // We use an alpha of 0.1, which provides a fairly strong smoothing effect.
297    let mut average_demand = Ewma::new(0.1);
298    let min_capacity = strategy.min_capacity;
299
300    loop {
301        debug!("Shrinker sleeping.");
302        sleep(SHRINKER_SLEEP_DURATION).await;
303
304        // Track the number of available permits, and the number of active items.
305        //
306        // When we have available permits, and our active count is greater than the minimum capacity, we'll take an item
307        // from the pool.
308        let active = strategy.active.load(Relaxed);
309        if active <= min_capacity {
310            debug!("Object pool already at minimum capacity. Nothing to do.");
311            continue;
312        }
313
314        let delta_demand = strategy.on_demand_allocs.swap(0, Relaxed);
315        average_demand.update(delta_demand as f64);
316        if average_demand.value() < 1.0 {
317            debug!(
318                avg_demand = average_demand.value(),
319                active, min_capacity, "Pool qualifies for shrinking. Attempting to remove single item..."
320            );
321
322            // Try acquiring a permit from the pool, giving us permission to remove an item.
323            let permit = strategy
324                .available
325                .acquire()
326                .await
327                .expect("semaphore should never be closed");
328
329            // Lock the pool and remove an item, taking care to update the active count while holding the lock.
330            let item = {
331                let item = strategy.items.lock().unwrap().pop_back().unwrap();
332                strategy.active.fetch_sub(1, AcqRel);
333                item
334            };
335
336            // Drop the item itself, and update our metrics.
337            drop(item);
338            strategy.metrics.deleted().increment(1);
339            strategy.metrics.capacity().decrement(1.0);
340
341            // Forget the permit so that we shrink the overall number of permits attached to the semaphore.
342            permit.forget();
343
344            debug!("Shrinker successfully removed an item from the pool.");
345        } else {
346            debug!(
347                avg_demand = average_demand.value(),
348                active, min_capacity, "Pool does not qualify for shrinking."
349            );
350        }
351    }
352}
353
354struct Ewma {
355    value: f64,
356    alpha: f64,
357}
358
359impl Ewma {
360    fn new(alpha: f64) -> Self {
361        Self { value: 0.0, alpha }
362    }
363
364    fn update(&mut self, new_value: f64) {
365        self.value = (1.0 - self.alpha) * self.value + self.alpha * new_value;
366    }
367
368    fn value(&self) -> f64 {
369        self.value
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use tokio_test::{assert_pending, assert_ready, task::spawn};
376
377    use super::ElasticObjectPool;
378    use crate::{pooled, pooling::ObjectPool as _};
379
380    pooled! {
381        struct TestObject {
382            value: u32,
383        }
384
385        clear => |this| this.value = 0
386    }
387
388    impl std::fmt::Debug for TestObject {
389        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390            f.debug_struct("TestObject").finish_non_exhaustive()
391        }
392    }
393
394    #[test]
395    fn basic() {
396        let (pool, _) = ElasticObjectPool::<TestObject>::with_capacity("test", 1, 2);
397        assert_eq!(pool.strategy.available.available_permits(), 1);
398
399        let mut acquire = spawn(pool.acquire());
400        let item = assert_ready!(acquire.poll());
401        assert_eq!(pool.strategy.available.available_permits(), 0);
402
403        drop(item);
404        assert_eq!(pool.strategy.available.available_permits(), 1);
405    }
406
407    #[test]
408    fn burst_allocation() {
409        let (pool, _) = ElasticObjectPool::<TestObject>::with_capacity("test", 1, 2);
410        assert_eq!(pool.strategy.available.available_permits(), 1);
411
412        // Acquire the first item, which should already exist.
413        let mut first_acquire = spawn(pool.acquire());
414        let first_item = assert_ready!(first_acquire.poll());
415        assert_eq!(pool.strategy.available.available_permits(), 0);
416
417        // Acquire a second item, which should be allocated on demand since we haven't reached our maximum capacity yet.
418        let mut second_acquire = spawn(pool.acquire());
419        let second_item = assert_ready!(second_acquire.poll());
420        assert_eq!(pool.strategy.available.available_permits(), 0);
421
422        // Try to acquire a third item, which should block because our pool has reached its maximum capacity.
423        let mut third_acquire = spawn(pool.acquire());
424        assert_pending!(third_acquire.poll());
425        assert!(!third_acquire.is_woken());
426
427        // Drop the items to return them to the pool and observe that at least one makes it back, but the semaphore will
428        // divert the returned permit for the first item to the pending third acquire.
429        drop(first_item);
430        assert_eq!(pool.strategy.available.available_permits(), 0);
431        drop(second_item);
432        assert_eq!(pool.strategy.available.available_permits(), 1);
433
434        // Now our third acquire should have been notified and we should be able to acquire an item.
435        assert!(third_acquire.is_woken());
436        let third_item = assert_ready!(third_acquire.poll());
437        assert_eq!(pool.strategy.available.available_permits(), 1);
438
439        drop(third_item);
440        assert_eq!(pool.strategy.available.available_permits(), 2);
441    }
442}