saluki_core/topology/
shutdown.rs

1//! Helpers for signaling the controlled shutdown of tasks.
2use std::{
3    future::Future,
4    pin::Pin,
5    sync::{
6        atomic::{AtomicUsize, Ordering},
7        Arc, Mutex,
8    },
9    task::{ready, Context, Poll},
10};
11
12use slab::Slab;
13use tokio::sync::{
14    oneshot::{channel, error::TryRecvError, Receiver, Sender},
15    Notify,
16};
17
18/// A component-specific shutdown coordinator.
19///
20/// This coordinator is designed for use by the topology to signal components to shutdown. Once a handle is registerd,
21/// it can not be unregistered. If you required the ability to unregister handles, consider using [`DynamicShutdownCoordinator`].
22#[derive(Default)]
23pub struct ComponentShutdownCoordinator {
24    handles: Vec<Sender<()>>,
25}
26
27/// A component shutdown handle.
28pub struct ComponentShutdownHandle {
29    receiver: Receiver<()>,
30}
31
32impl ComponentShutdownCoordinator {
33    /// Registers a shutdown handle.
34    pub fn register(&mut self) -> ComponentShutdownHandle {
35        let (sender, receiver) = channel();
36        self.handles.push(sender);
37
38        ComponentShutdownHandle { receiver }
39    }
40
41    /// Triggers shutdown and notifies all registered handles.
42    pub fn shutdown(self) {
43        for handle in self.handles {
44            let _ = handle.send(());
45        }
46    }
47}
48
49impl Future for ComponentShutdownHandle {
50    type Output = ();
51
52    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
53        // SAFETY: `Receiver` is `Unpin`, so we can safely project it.
54        let receiver = unsafe { self.map_unchecked_mut(|s| &mut s.receiver) };
55        receiver.poll(cx).map(|_| ())
56    }
57}
58
59#[derive(Default)]
60struct State {
61    waiters: Mutex<Slab<Sender<()>>>,
62    outstanding_handles: AtomicUsize,
63    shutdown_complete: Notify,
64}
65
66/// A shutdown coordinator that can dynamically register handles for shutdown.
67#[derive(Default)]
68pub struct DynamicShutdownCoordinator {
69    state: Arc<State>,
70}
71
72/// A dynamic shutdown handle.
73///
74/// This handle is a `Future` which resolves when the shutdown coordinator has triggered shutdown. Additionally, it is
75/// used to signal to the coordinator, on drop, that the user of the handle has completed shutdown.
76pub struct DynamicShutdownHandle {
77    state: Arc<State>,
78    wait_tx_idx: usize,
79    wait_rx: Receiver<()>,
80    notified: bool,
81}
82
83impl DynamicShutdownCoordinator {
84    /// Registers a shutdown handle.
85    pub fn register(&mut self) -> DynamicShutdownHandle {
86        let (wait_tx, wait_rx) = channel();
87        let mut waiters = self.state.waiters.lock().unwrap();
88        let wait_tx_idx = waiters.insert(wait_tx);
89        self.state.outstanding_handles.fetch_add(1, Ordering::Release);
90
91        DynamicShutdownHandle {
92            state: Arc::clone(&self.state),
93            wait_tx_idx,
94            wait_rx,
95            notified: false,
96        }
97    }
98
99    /// Triggers shutdown and notifies all outstanding handles, waiting until all handles have been dropped.
100    ///
101    /// If there are any outstanding handles, they are signaled to shutdown and this function will only return once all
102    /// outstanding handles have been dropped. If there are no outstanding handles, the function returns immediately.
103    pub async fn shutdown(self) {
104        // Register ourselves for the shutdown notification here, which ensures that if we do have outstanding handles
105        // which are only shutdown when we signal them below, we'll be properly notified when they're all dropped.
106        let shutdown_complete = self.state.shutdown_complete.notified();
107
108        {
109            let mut waiters = self.state.waiters.lock().unwrap();
110            if waiters.is_empty() {
111                return;
112            }
113
114            for waiter in waiters.drain() {
115                let _ = waiter.send(());
116            }
117        }
118
119        shutdown_complete.await;
120    }
121}
122
123impl Drop for DynamicShutdownHandle {
124    fn drop(&mut self) {
125        // If we haven't definitively been notified yet (possibly due simply to not yet being polled), then we do a
126        // fallible receive here. We only remove the wait sender if it was never used/dropped.
127        if !self.notified {
128            let mut waiters = self.state.waiters.lock().unwrap();
129
130            if let Err(TryRecvError::Empty) = self.wait_rx.try_recv() {
131                waiters.remove(self.wait_tx_idx);
132            }
133        }
134
135        if self.state.outstanding_handles.fetch_sub(1, Ordering::AcqRel) == 1 {
136            // We're the last handle currently registered to this coordinator, so notify the coordinator.
137            //
138            // Crucially, we use `notify_waiters` because we don't want to store a wakeup: we may be the last handle,
139            // but shutdown may not have been triggered yet. This ensures that the coordinator can properly distinguish
140            // when all handles have dropped during actual shutdown.
141            self.state.shutdown_complete.notify_waiters();
142        }
143    }
144}
145
146impl Future for DynamicShutdownHandle {
147    type Output = ();
148
149    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150        if self.notified {
151            return Poll::Ready(());
152        }
153
154        let this = self.get_mut();
155
156        // SAFETY: `Receiver` is `Unpin`, so we can safely project it.
157        let _ = ready!(Pin::new(&mut this.wait_rx).poll(cx));
158
159        this.notified = true;
160        Poll::Ready(())
161    }
162}