saluki_core/pooling/
fixed.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    pin::Pin,
5    sync::{Arc, Mutex},
6    task::{ready, Context, Poll},
7};
8
9use pin_project::pin_project;
10use tokio::sync::Semaphore;
11use tokio_util::sync::PollSemaphore;
12
13use super::{Clearable, ObjectPool, PoolMetrics, Poolable, ReclaimStrategy};
14
15/// A fixed-size object pool.
16///
17/// All items for this object pool are created up front, and when the object pool is empty, calls to `acquire` will
18/// block until an item is returned to the pool.
19///
20/// ## Metrics
21///
22/// - `object_pool.acquired{pool_name="<pool_name>"}` - total count of the number of objects acquired from the pool (counter)
23/// - `object_pool.released{pool_name="<pool_name>"}` - total count of the number of objects released back to the pool (counter)
24/// - `object_pool.in_use{pool_name="<pool_name>"}` - number of objects from the pool that are currently in use (gauge)
25pub struct FixedSizeObjectPool<T: Poolable> {
26    strategy: Arc<FixedSizeStrategy<T>>,
27}
28
29impl<T: Poolable> FixedSizeObjectPool<T>
30where
31    T::Data: Default,
32{
33    /// Creates a new `FixedSizeObjectPool` with the given capacity.
34    ///
35    /// Metrics are emitted for the pool with a tag (`pool_name`) that is set to the value of the given pool name.
36    pub fn with_capacity<S>(pool_name: S, capacity: usize) -> Self
37    where
38        S: Into<String>,
39    {
40        Self {
41            strategy: Arc::new(FixedSizeStrategy::new(pool_name, capacity)),
42        }
43    }
44}
45
46impl<T: Poolable> FixedSizeObjectPool<T> {
47    /// Creates a new `FixedSizeObjectPool` with the given capacity and item builder.
48    ///
49    /// `builder` is called to construct each item.
50    ///
51    /// Metrics are emitted for the pool with a tag (`pool_name`) that is set to the value of the given pool name.
52    pub fn with_builder<S, B>(pool_name: S, capacity: usize, builder: B) -> Self
53    where
54        S: Into<String>,
55        B: Fn() -> T::Data,
56    {
57        Self {
58            strategy: Arc::new(FixedSizeStrategy::with_builder(pool_name, capacity, builder)),
59        }
60    }
61}
62
63impl<T: Poolable> Clone for FixedSizeObjectPool<T> {
64    fn clone(&self) -> Self {
65        Self {
66            strategy: self.strategy.clone(),
67        }
68    }
69}
70
71impl<T> ObjectPool for FixedSizeObjectPool<T>
72where
73    T: Poolable + Send + Unpin + 'static,
74{
75    type Item = T;
76    type AcquireFuture = FixedSizeAcquireFuture<T>;
77
78    fn acquire(&self) -> Self::AcquireFuture {
79        FixedSizeStrategy::acquire(&self.strategy)
80    }
81}
82
83struct FixedSizeStrategy<T: Poolable> {
84    items: Mutex<VecDeque<T::Data>>,
85    available: Arc<Semaphore>,
86    metrics: PoolMetrics,
87}
88
89impl<T> FixedSizeStrategy<T>
90where
91    T: Poolable,
92    T::Data: Default,
93{
94    fn new<S>(pool_name: S, capacity: usize) -> Self
95    where
96        S: Into<String>,
97    {
98        let mut items = VecDeque::with_capacity(capacity);
99        items.extend((0..capacity).map(|_| T::Data::default()));
100        let available = Arc::new(Semaphore::new(capacity));
101
102        Self {
103            items: Mutex::new(items),
104            available,
105            metrics: PoolMetrics::new(pool_name.into()),
106        }
107    }
108}
109
110impl<T: Poolable> FixedSizeStrategy<T> {
111    fn with_builder<S, B>(pool_name: S, capacity: usize, builder: B) -> Self
112    where
113        S: Into<String>,
114        B: Fn() -> T::Data,
115    {
116        let mut items = VecDeque::with_capacity(capacity);
117        items.extend((0..capacity).map(|_| builder()));
118        let available = Arc::new(Semaphore::new(capacity));
119
120        let metrics = PoolMetrics::new(pool_name.into());
121        metrics.created().increment(capacity as u64);
122        metrics.capacity().set(capacity as f64);
123
124        Self {
125            items: Mutex::new(items),
126            available,
127            metrics,
128        }
129    }
130}
131
132impl<T> FixedSizeStrategy<T>
133where
134    T: Poolable,
135    T::Data: Send + 'static,
136{
137    fn acquire(strategy: &Arc<Self>) -> FixedSizeAcquireFuture<T> {
138        FixedSizeAcquireFuture::new(Arc::clone(strategy))
139    }
140}
141
142impl<T: Poolable> ReclaimStrategy<T> for FixedSizeStrategy<T> {
143    fn reclaim(&self, mut data: T::Data) {
144        data.clear();
145
146        self.items.lock().unwrap().push_back(data);
147        self.available.add_permits(1);
148        self.metrics.released().increment(1);
149        self.metrics.in_use().decrement(1.0);
150    }
151}
152
153#[pin_project]
154pub struct FixedSizeAcquireFuture<T: Poolable> {
155    strategy: Option<Arc<FixedSizeStrategy<T>>>,
156    semaphore: PollSemaphore,
157}
158
159impl<T: Poolable> FixedSizeAcquireFuture<T> {
160    fn new(strategy: Arc<FixedSizeStrategy<T>>) -> Self {
161        let semaphore = PollSemaphore::new(Arc::clone(&strategy.available));
162        Self {
163            strategy: Some(strategy),
164            semaphore,
165        }
166    }
167}
168
169impl<T> Future for FixedSizeAcquireFuture<T>
170where
171    T: Poolable + 'static,
172{
173    type Output = T;
174
175    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176        let this = self.project();
177
178        match ready!(this.semaphore.poll_acquire(cx)) {
179            Some(permit) => {
180                permit.forget();
181
182                let strategy = this.strategy.take().unwrap();
183                let data = strategy.items.lock().unwrap().pop_back().unwrap();
184                strategy.metrics.acquired().increment(1);
185                strategy.metrics.in_use().increment(1.0);
186                Poll::Ready(T::from_data(strategy, data))
187            }
188            None => unreachable!("semaphore should never be closed"),
189        }
190    }
191}