saluki_core/topology/
shutdown.rs1use std::{
3 future::Future,
4 pin::Pin,
5 sync::{
6 atomic::{AtomicUsize, Ordering},
7 Arc, Mutex,
8 },
9 task::{ready, Context, Poll},
10};
11
12use slab::Slab;
13use tokio::sync::{
14 oneshot::{channel, error::TryRecvError, Receiver, Sender},
15 Notify,
16};
17
18#[derive(Default)]
23pub struct ComponentShutdownCoordinator {
24 handles: Vec<Sender<()>>,
25}
26
27pub struct ComponentShutdownHandle {
29 receiver: Receiver<()>,
30}
31
32impl ComponentShutdownCoordinator {
33 pub fn register(&mut self) -> ComponentShutdownHandle {
35 let (sender, receiver) = channel();
36 self.handles.push(sender);
37
38 ComponentShutdownHandle { receiver }
39 }
40
41 pub fn shutdown(self) {
43 for handle in self.handles {
44 let _ = handle.send(());
45 }
46 }
47}
48
49impl Future for ComponentShutdownHandle {
50 type Output = ();
51
52 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
53 let receiver = unsafe { self.map_unchecked_mut(|s| &mut s.receiver) };
55 receiver.poll(cx).map(|_| ())
56 }
57}
58
59#[derive(Default)]
60struct State {
61 waiters: Mutex<Slab<Sender<()>>>,
62 outstanding_handles: AtomicUsize,
63 shutdown_complete: Notify,
64}
65
66#[derive(Default)]
68pub struct DynamicShutdownCoordinator {
69 state: Arc<State>,
70}
71
72pub struct DynamicShutdownHandle {
77 state: Arc<State>,
78 wait_tx_idx: usize,
79 wait_rx: Receiver<()>,
80 notified: bool,
81}
82
83impl DynamicShutdownCoordinator {
84 pub fn register(&mut self) -> DynamicShutdownHandle {
86 let (wait_tx, wait_rx) = channel();
87 let mut waiters = self.state.waiters.lock().unwrap();
88 let wait_tx_idx = waiters.insert(wait_tx);
89 self.state.outstanding_handles.fetch_add(1, Ordering::Release);
90
91 DynamicShutdownHandle {
92 state: Arc::clone(&self.state),
93 wait_tx_idx,
94 wait_rx,
95 notified: false,
96 }
97 }
98
99 pub async fn shutdown(self) {
104 let shutdown_complete = self.state.shutdown_complete.notified();
107
108 {
109 let mut waiters = self.state.waiters.lock().unwrap();
110 if waiters.is_empty() {
111 return;
112 }
113
114 for waiter in waiters.drain() {
115 let _ = waiter.send(());
116 }
117 }
118
119 shutdown_complete.await;
120 }
121}
122
123impl Drop for DynamicShutdownHandle {
124 fn drop(&mut self) {
125 if !self.notified {
128 let mut waiters = self.state.waiters.lock().unwrap();
129
130 if let Err(TryRecvError::Empty) = self.wait_rx.try_recv() {
131 waiters.remove(self.wait_tx_idx);
132 }
133 }
134
135 if self.state.outstanding_handles.fetch_sub(1, Ordering::AcqRel) == 1 {
136 self.state.shutdown_complete.notify_waiters();
142 }
143 }
144}
145
146impl Future for DynamicShutdownHandle {
147 type Output = ();
148
149 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150 if self.notified {
151 return Poll::Ready(());
152 }
153
154 let this = self.get_mut();
155
156 let _ = ready!(Pin::new(&mut this.wait_rx).poll(cx));
158
159 this.notified = true;
160 Poll::Ready(())
161 }
162}