saluki_common/sync/
shutdown.rs1use std::{
4 future::Future,
5 pin::Pin,
6 sync::{
7 atomic::{
8 AtomicBool, AtomicUsize,
9 Ordering::{AcqRel, Acquire, Release},
10 },
11 Arc,
12 },
13 task::{Context, Poll},
14};
15
16use pin_project::{pin_project, pinned_drop};
17use tokio::sync::{futures::OwnedNotified, Notify};
18
19#[derive(Default)]
20struct ShutdownInner {
21 flag: AtomicBool,
22 notify: Arc<Notify>,
23 outstanding: AtomicUsize,
24 all_dropped: Notify,
25}
26
27impl ShutdownInner {
28 fn register(&self) {
30 self.outstanding.fetch_add(1, Release);
31 }
32
33 fn deregister(&self) {
35 if self.outstanding.fetch_sub(1, AcqRel) == 1 {
36 self.all_dropped.notify_waiters();
39 }
40 }
41
42 fn signal(&self) {
44 if !self.flag.swap(true, AcqRel) {
45 self.notify.notify_waiters();
46 }
47 }
48
49 async fn signal_and_await_handles(&self) {
53 let all_dropped = self.all_dropped.notified();
56
57 self.signal();
58
59 if self.outstanding.load(Acquire) == 0 {
60 return;
61 }
62
63 all_dropped.await;
64 }
65}
66
67#[derive(Default)]
83pub struct ShutdownCoordinator {
84 state: Arc<ShutdownInner>,
85}
86
87impl ShutdownCoordinator {
88 pub fn register(&mut self) -> ShutdownHandle {
90 ShutdownHandle::registered(Arc::clone(&self.state))
91 }
92
93 pub fn shutdown(self) {
98 self.state.signal();
99 }
100
101 pub async fn shutdown_and_wait(self) {
105 self.state.signal_and_await_handles().await;
106 }
107}
108
109impl Drop for ShutdownCoordinator {
110 fn drop(&mut self) {
111 self.state.signal();
112 }
113}
114
115#[pin_project(PinnedDrop)]
121pub struct ShutdownHandle {
122 state: Arc<ShutdownInner>,
123
124 #[pin]
125 notified: OwnedNotified,
126}
127
128impl ShutdownHandle {
129 fn registered(state: Arc<ShutdownInner>) -> Self {
131 state.register();
132
133 let notify = Arc::clone(&state.notify);
138 let notified = notify.notified_owned();
139
140 Self { state, notified }
141 }
142
143 pub fn noop() -> Self {
147 Self::registered(Arc::new(ShutdownInner::default()))
148 }
149
150 pub fn paired() -> (ShutdownCoordinator, Self) {
155 let mut coordinator = ShutdownCoordinator::default();
156 let handle = coordinator.register();
157 (coordinator, handle)
158 }
159
160 pub fn is_triggered(&self) -> bool {
162 self.state.flag.load(Acquire)
163 }
164}
165
166impl Future for ShutdownHandle {
167 type Output = ();
168
169 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170 let this = self.project();
171
172 if this.state.flag.load(Acquire) {
174 return Poll::Ready(());
175 }
176
177 this.notified.poll(cx)
183 }
184}
185
186#[pinned_drop]
187impl PinnedDrop for ShutdownHandle {
188 fn drop(self: Pin<&mut Self>) {
189 self.state.deregister();
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use tokio_test::{assert_pending, assert_ready, task::spawn};
196
197 use super::*;
198
199 #[test]
200 fn handle_starts_untriggered() {
201 let (_coordinator, handle) = ShutdownHandle::paired();
202
203 assert!(!handle.is_triggered());
204 }
205
206 #[test]
207 fn handle_triggered_on_coordinator_shutdown() {
208 let (coordinator, handle) = ShutdownHandle::paired();
209 assert!(!handle.is_triggered());
210
211 coordinator.shutdown();
212 assert!(handle.is_triggered());
213 }
214
215 #[test]
216 fn handle_triggered_on_coordinator_drop() {
217 let (coordinator, handle) = ShutdownHandle::paired();
218 assert!(!handle.is_triggered());
219
220 drop(coordinator);
221 assert!(handle.is_triggered());
222 }
223
224 #[test]
225 fn handle_wait_returns_immediately_when_already_triggered() {
226 let (coordinator, handle) = ShutdownHandle::paired();
227 coordinator.shutdown();
228
229 let mut wait = spawn(handle);
230 assert_ready!(wait.poll());
231 }
232
233 #[test]
234 fn handle_completes_after_coordinator_shutdown() {
235 let (coordinator, handle) = ShutdownHandle::paired();
236
237 let mut wait = spawn(handle);
238 assert_pending!(wait.poll());
239 assert!(!wait.is_woken());
240
241 coordinator.shutdown();
243 assert!(wait.is_woken());
244 assert_ready!(wait.poll());
245 }
246
247 #[test]
248 fn handle_completes_after_coordinator_dropped() {
249 let (coordinator, handle) = ShutdownHandle::paired();
250
251 let mut wait = spawn(handle);
252 assert_pending!(wait.poll());
253 assert!(!wait.is_woken());
254
255 drop(coordinator);
258 assert!(wait.is_woken());
259 assert_ready!(wait.poll());
260 }
261
262 #[test]
263 fn handle_is_fused_after_coordinator_shutdown() {
264 let (coordinator, handle) = ShutdownHandle::paired();
265 coordinator.shutdown();
266
267 let mut wait = spawn(handle);
269 assert_ready!(wait.poll());
270 assert_ready!(wait.poll());
271 }
272
273 #[test]
274 fn multiple_handles_all_wake_on_shutdown() {
275 let mut coordinator = ShutdownCoordinator::default();
276 let handle1 = coordinator.register();
277 let handle2 = coordinator.register();
278 let handle3 = coordinator.register();
279
280 let mut wait1 = spawn(handle1);
281 let mut wait2 = spawn(handle2);
282 let mut wait3 = spawn(handle3);
283
284 assert_pending!(wait1.poll());
285 assert_pending!(wait2.poll());
286 assert_pending!(wait3.poll());
287
288 coordinator.shutdown();
290 assert!(wait1.is_woken());
291 assert!(wait2.is_woken());
292 assert!(wait3.is_woken());
293
294 assert_ready!(wait1.poll());
295 assert_ready!(wait2.poll());
296 assert_ready!(wait3.poll());
297 }
298
299 #[test]
300 fn noop_handle_never_completes() {
301 let handle = ShutdownHandle::noop();
302
303 assert!(!handle.is_triggered());
305
306 let mut wait = spawn(handle);
307 assert_pending!(wait.poll());
308 assert_pending!(wait.poll());
309 assert!(!wait.is_woken());
310 }
311
312 #[test]
313 fn coordinator_shutdown_and_wait_returns_immediately_with_no_handles() {
314 let coordinator = ShutdownCoordinator::default();
315
316 let mut shutdown = spawn(coordinator.shutdown_and_wait());
318 assert_ready!(shutdown.poll());
319 }
320
321 #[test]
322 fn coordinator_shutdown_and_wait_ignores_handle_dropped_before_shutdown() {
323 let mut coordinator = ShutdownCoordinator::default();
324
325 let handle = coordinator.register();
328 drop(handle);
329
330 let mut shutdown = spawn(coordinator.shutdown_and_wait());
332 assert_ready!(shutdown.poll());
333 }
334
335 #[test]
336 fn coordinator_shutdown_waits_for_outstanding_handle_to_drop_single() {
337 let mut coordinator = ShutdownCoordinator::default();
338 let handle = coordinator.register();
339
340 let mut wait = spawn(handle);
341 assert!(!wait.is_woken());
342 assert_pending!(wait.poll());
343
344 let mut shutdown = spawn(coordinator.shutdown_and_wait());
346 assert!(!shutdown.is_woken());
347 assert_pending!(shutdown.poll());
348 assert!(!shutdown.is_woken());
349
350 assert!(wait.is_woken());
353 assert_ready!(wait.poll());
354
355 assert_pending!(shutdown.poll());
356 assert!(!shutdown.is_woken());
357
358 drop(wait);
359
360 assert!(shutdown.is_woken());
361 assert_ready!(shutdown.poll());
362 }
363
364 #[test]
365 fn coordinator_shutdown_waits_for_outstanding_handle_to_drop_multiple() {
366 let mut coordinator = ShutdownCoordinator::default();
367 let handle1 = coordinator.register();
368 let handle2 = coordinator.register();
369
370 let mut wait1 = spawn(handle1);
371 let mut wait2 = spawn(handle2);
372
373 assert!(!wait1.is_woken());
374 assert!(!wait2.is_woken());
375 assert_pending!(wait1.poll());
376 assert_pending!(wait2.poll());
377
378 let mut shutdown = spawn(coordinator.shutdown_and_wait());
380 assert!(!shutdown.is_woken());
381 assert_pending!(shutdown.poll());
382 assert!(!shutdown.is_woken());
383
384 assert!(wait1.is_woken());
387 assert_ready!(wait1.poll());
388
389 assert!(wait2.is_woken());
390 assert_ready!(wait2.poll());
391
392 assert!(!shutdown.is_woken());
393 assert_pending!(shutdown.poll());
394
395 drop(wait1);
397
398 assert!(!shutdown.is_woken());
399 assert_pending!(shutdown.poll());
400
401 drop(wait2);
403
404 assert!(shutdown.is_woken());
405 assert_ready!(shutdown.poll());
406 }
407}