Skip to main content

saluki_common/sync/
shutdown.rs

1//! Primitives for signalling shutdown.
2
3use std::{
4    future::Future,
5    pin::Pin,
6    sync::{
7        atomic::{
8            AtomicBool, AtomicUsize,
9            Ordering::{AcqRel, Acquire, Release},
10        },
11        Arc,
12    },
13    task::{Context, Poll},
14};
15
16use pin_project::{pin_project, pinned_drop};
17use tokio::sync::{futures::OwnedNotified, Notify};
18
19#[derive(Default)]
20struct ShutdownInner {
21    flag: AtomicBool,
22    notify: Arc<Notify>,
23    outstanding: AtomicUsize,
24    all_dropped: Notify,
25}
26
27impl ShutdownInner {
28    /// Records a newly created handle.
29    fn register(&self) {
30        self.outstanding.fetch_add(1, Release);
31    }
32
33    /// Records a dropped handle, releasing completion waiters once the last handle goes away.
34    fn deregister(&self) {
35        if self.outstanding.fetch_sub(1, AcqRel) == 1 {
36            // Use `notify_waiters` (rather than storing a permit): the last handle may drop _before_ shutdown is
37            // signaled, and an emitter only cares about this once it's actually waiting in `signal_and_await_handles`.
38            self.all_dropped.notify_waiters();
39        }
40    }
41
42    /// Signals shutdown, releasing any blocked waiters.
43    fn signal(&self) {
44        if !self.flag.swap(true, AcqRel) {
45            self.notify.notify_waiters();
46        }
47    }
48
49    /// Signals shutdown and waits until every outstanding handle has been dropped.
50    ///
51    /// Returns immediately if there are no outstanding handles.
52    async fn signal_and_await_handles(&self) {
53        // Snapshot the completion notification _before_ signaling, so the last handle dropping can't slip through the
54        // gap between signaling and parking.
55        let all_dropped = self.all_dropped.notified();
56
57        self.signal();
58
59        if self.outstanding.load(Acquire) == 0 {
60            return;
61        }
62
63        all_dropped.await;
64    }
65}
66
67/// A shutdown coordinator for triggering, and optionally waiting for, tasks to shut down.
68///
69/// Callers register a [`ShutdownHandle`], which can be awaited to observe shutdown being triggered, or checked
70/// synchronously via [`ShutdownHandle::is_triggered`]. When a handle is dropped, it reports completion back to the
71/// coordinator; this is what allows [`shutdown_and_wait`][Self::shutdown_and_wait] to block until every outstanding
72/// handle has finished.
73///
74/// Shutdown is triggered either fire-and-forget, via [`shutdown`][Self::shutdown], or blocking, via
75/// [`shutdown_and_wait`][Self::shutdown_and_wait].
76///
77/// # Shutdown on drop
78///
79/// Shutdown is also triggered implicitly on drop if it hasn't already been triggered, to enforce that shutdown always
80/// happens even in exceptional circumstances. Shutdown on drop is fire-and-forget: when the coordinator's drop logic
81/// has executed, there is no guarantee that all outstanding handles have also been dropped.
82#[derive(Default)]
83pub struct ShutdownCoordinator {
84    state: Arc<ShutdownInner>,
85}
86
87impl ShutdownCoordinator {
88    /// Registers and returns a new shutdown handle.
89    pub fn register(&mut self) -> ShutdownHandle {
90        ShutdownHandle::registered(Arc::clone(&self.state))
91    }
92
93    /// Signals shutdown to all outstanding handles without waiting.
94    ///
95    /// When it is desirable to wait until all outstanding handles have completed (been dropped), use
96    /// [`shutdown_and_wait`][Self::shutdown_and_wait].
97    pub fn shutdown(self) {
98        self.state.signal();
99    }
100
101    /// Signals shutdown to all outstanding handles, waiting until all handles have completed (been dropped).
102    ///
103    /// If there are no outstanding handles, this returns immediately.
104    pub async fn shutdown_and_wait(self) {
105        self.state.signal_and_await_handles().await;
106    }
107}
108
109impl Drop for ShutdownCoordinator {
110    fn drop(&mut self) {
111        self.state.signal();
112    }
113}
114
115/// A handle for observing shutdown from a [`ShutdownCoordinator`].
116///
117/// The handle is a [`Future`] that resolves once shutdown is triggered, and can also be checked synchronously via
118/// [`is_triggered`][Self::is_triggered]. Dropping the handle reports completion back to its coordinator, which is how
119/// [`ShutdownCoordinator::shutdown_and_wait`] knows when all outstanding work has finished.
120#[pin_project(PinnedDrop)]
121pub struct ShutdownHandle {
122    state: Arc<ShutdownInner>,
123
124    #[pin]
125    notified: OwnedNotified,
126}
127
128impl ShutdownHandle {
129    /// Creates a handle sharing the given state, registering it as an outstanding handle.
130    fn registered(state: Arc<ShutdownInner>) -> Self {
131        state.register();
132
133        // Capture our `OwnedNotified` during registration, which ensures that when we eventually poll it, it will be
134        // guaranteed that we observe any notifications sent _after_ this point.
135        //
136        // We couple that invariant with first checking the atomic flag to ensure we never miss wakeups.
137        let notify = Arc::clone(&state.notify);
138        let notified = notify.notified_owned();
139
140        Self { state, notified }
141    }
142
143    /// Creates a shutdown handle that is never triggered.
144    ///
145    /// This can be useful when a callee requires a handle but no shutdown signal is expected or available to provide.
146    pub fn noop() -> Self {
147        Self::registered(Arc::new(ShutdownInner::default()))
148    }
149
150    /// Creates a coordinator/handle pair.
151    ///
152    /// This is equivalent to creating [`ShutdownCoordinator`] and calling [`ShutdownCoordinator::register`]. The
153    /// resulting coordinator can still be used to register further handles if need be.
154    pub fn paired() -> (ShutdownCoordinator, Self) {
155        let mut coordinator = ShutdownCoordinator::default();
156        let handle = coordinator.register();
157        (coordinator, handle)
158    }
159
160    /// Returns `true` if shutdown has been triggered.
161    pub fn is_triggered(&self) -> bool {
162        self.state.flag.load(Acquire)
163    }
164}
165
166impl Future for ShutdownHandle {
167    type Output = ();
168
169    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170        let this = self.project();
171
172        // Fast path: if shutdown has already been triggered, return immediately.
173        if this.state.flag.load(Acquire) {
174            return Poll::Ready(());
175        }
176
177        // Wait to be notified that shutdown has been triggered.
178        //
179        // We don't need to recheck the flag because we know that by creating `OwnedNotified` before
180        // checking the flag, any subsequent poll we do that returns as ready can only be the result
181        // of the notify happening after setting the flag.
182        this.notified.poll(cx)
183    }
184}
185
186#[pinned_drop]
187impl PinnedDrop for ShutdownHandle {
188    fn drop(self: Pin<&mut Self>) {
189        self.state.deregister();
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use tokio_test::{assert_pending, assert_ready, task::spawn};
196
197    use super::*;
198
199    #[test]
200    fn handle_starts_untriggered() {
201        let (_coordinator, handle) = ShutdownHandle::paired();
202
203        assert!(!handle.is_triggered());
204    }
205
206    #[test]
207    fn handle_triggered_on_coordinator_shutdown() {
208        let (coordinator, handle) = ShutdownHandle::paired();
209        assert!(!handle.is_triggered());
210
211        coordinator.shutdown();
212        assert!(handle.is_triggered());
213    }
214
215    #[test]
216    fn handle_triggered_on_coordinator_drop() {
217        let (coordinator, handle) = ShutdownHandle::paired();
218        assert!(!handle.is_triggered());
219
220        drop(coordinator);
221        assert!(handle.is_triggered());
222    }
223
224    #[test]
225    fn handle_wait_returns_immediately_when_already_triggered() {
226        let (coordinator, handle) = ShutdownHandle::paired();
227        coordinator.shutdown();
228
229        let mut wait = spawn(handle);
230        assert_ready!(wait.poll());
231    }
232
233    #[test]
234    fn handle_completes_after_coordinator_shutdown() {
235        let (coordinator, handle) = ShutdownHandle::paired();
236
237        let mut wait = spawn(handle);
238        assert_pending!(wait.poll());
239        assert!(!wait.is_woken());
240
241        // Triggering shutdown wakes the parked handle and allows it to resolve:
242        coordinator.shutdown();
243        assert!(wait.is_woken());
244        assert_ready!(wait.poll());
245    }
246
247    #[test]
248    fn handle_completes_after_coordinator_dropped() {
249        let (coordinator, handle) = ShutdownHandle::paired();
250
251        let mut wait = spawn(handle);
252        assert_pending!(wait.poll());
253        assert!(!wait.is_woken());
254
255        // Dropping the coordinator is equivalent to triggering shutdown and should wake the parked handle and allow it
256        // to resolve:
257        drop(coordinator);
258        assert!(wait.is_woken());
259        assert_ready!(wait.poll());
260    }
261
262    #[test]
263    fn handle_is_fused_after_coordinator_shutdown() {
264        let (coordinator, handle) = ShutdownHandle::paired();
265        coordinator.shutdown();
266
267        // The flag is latched, so every subsequent wait observes it and resolves immediately:
268        let mut wait = spawn(handle);
269        assert_ready!(wait.poll());
270        assert_ready!(wait.poll());
271    }
272
273    #[test]
274    fn multiple_handles_all_wake_on_shutdown() {
275        let mut coordinator = ShutdownCoordinator::default();
276        let handle1 = coordinator.register();
277        let handle2 = coordinator.register();
278        let handle3 = coordinator.register();
279
280        let mut wait1 = spawn(handle1);
281        let mut wait2 = spawn(handle2);
282        let mut wait3 = spawn(handle3);
283
284        assert_pending!(wait1.poll());
285        assert_pending!(wait2.poll());
286        assert_pending!(wait3.poll());
287
288        // Triggering shutdown should immediately wake all parked handles and allow them to resolve:
289        coordinator.shutdown();
290        assert!(wait1.is_woken());
291        assert!(wait2.is_woken());
292        assert!(wait3.is_woken());
293
294        assert_ready!(wait1.poll());
295        assert_ready!(wait2.poll());
296        assert_ready!(wait3.poll());
297    }
298
299    #[test]
300    fn noop_handle_never_completes() {
301        let handle = ShutdownHandle::noop();
302
303        // A no-op handle has no trigger, so it reports as untriggered and never resolves:
304        assert!(!handle.is_triggered());
305
306        let mut wait = spawn(handle);
307        assert_pending!(wait.poll());
308        assert_pending!(wait.poll());
309        assert!(!wait.is_woken());
310    }
311
312    #[test]
313    fn coordinator_shutdown_and_wait_returns_immediately_with_no_handles() {
314        let coordinator = ShutdownCoordinator::default();
315
316        // With no registered handles, `shutdown_and_wait` resolves on the first poll:
317        let mut shutdown = spawn(coordinator.shutdown_and_wait());
318        assert_ready!(shutdown.poll());
319    }
320
321    #[test]
322    fn coordinator_shutdown_and_wait_ignores_handle_dropped_before_shutdown() {
323        let mut coordinator = ShutdownCoordinator::default();
324
325        // A handle registered and then dropped _before_ shutdown deregisters itself, so it doesn't count toward
326        // outstanding handles:
327        let handle = coordinator.register();
328        drop(handle);
329
330        // With no outstanding handles remaining, `shutdown_and_wait` resolves on the first poll:
331        let mut shutdown = spawn(coordinator.shutdown_and_wait());
332        assert_ready!(shutdown.poll());
333    }
334
335    #[test]
336    fn coordinator_shutdown_waits_for_outstanding_handle_to_drop_single() {
337        let mut coordinator = ShutdownCoordinator::default();
338        let handle = coordinator.register();
339
340        let mut wait = spawn(handle);
341        assert!(!wait.is_woken());
342        assert_pending!(wait.poll());
343
344        // The first poll signals the outstanding handle and then parks until that handle is dropped:
345        let mut shutdown = spawn(coordinator.shutdown_and_wait());
346        assert!(!shutdown.is_woken());
347        assert_pending!(shutdown.poll());
348        assert!(!shutdown.is_woken());
349
350        // Our handle should now be woken up and able to complete, but the coordinator should _not_ be woken up until
351        // the handle is actually dropped:
352        assert!(wait.is_woken());
353        assert_ready!(wait.poll());
354
355        assert_pending!(shutdown.poll());
356        assert!(!shutdown.is_woken());
357
358        drop(wait);
359
360        assert!(shutdown.is_woken());
361        assert_ready!(shutdown.poll());
362    }
363
364    #[test]
365    fn coordinator_shutdown_waits_for_outstanding_handle_to_drop_multiple() {
366        let mut coordinator = ShutdownCoordinator::default();
367        let handle1 = coordinator.register();
368        let handle2 = coordinator.register();
369
370        let mut wait1 = spawn(handle1);
371        let mut wait2 = spawn(handle2);
372
373        assert!(!wait1.is_woken());
374        assert!(!wait2.is_woken());
375        assert_pending!(wait1.poll());
376        assert_pending!(wait2.poll());
377
378        // The first poll signals the outstanding handles and then parks until those handles are dropped:
379        let mut shutdown = spawn(coordinator.shutdown_and_wait());
380        assert!(!shutdown.is_woken());
381        assert_pending!(shutdown.poll());
382        assert!(!shutdown.is_woken());
383
384        // Our handles should now be woken up and able to complete, but the coordinator should _not_ be woken up until
385        // the handles are actually dropped:
386        assert!(wait1.is_woken());
387        assert_ready!(wait1.poll());
388
389        assert!(wait2.is_woken());
390        assert_ready!(wait2.poll());
391
392        assert!(!shutdown.is_woken());
393        assert_pending!(shutdown.poll());
394
395        // Drop the first handle, which should not resolve the shutdown yet:
396        drop(wait1);
397
398        assert!(!shutdown.is_woken());
399        assert_pending!(shutdown.poll());
400
401        // Drop the second handle, which should resolve the shutdown:
402        drop(wait2);
403
404        assert!(shutdown.is_woken());
405        assert_ready!(shutdown.poll());
406    }
407}