saluki_core/runtime/shutdown.rs
1use std::{
2 future::{pending, Future},
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures::FutureExt as _;
8use tokio::sync::oneshot;
9
10/// A shutdown signal for a process.
11///
12/// This struct can be used to wait for a shutdown signal from the supervisor to which the process belongs.
13pub struct ProcessShutdown {
14 shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
15}
16
17/// A handle to trigger process shutdown.
18pub struct ShutdownHandle {
19 shutdown_tx: oneshot::Sender<()>,
20}
21
22impl ProcessShutdown {
23 /// Creates a new `ProcessShutdown` and `ShutdownHandle` pair.
24 ///
25 /// When `ShutdownHandle` is triggered, or dropped, `ProcessShutdown` will resolve.
26 pub fn paired() -> (Self, ShutdownHandle) {
27 let (shutdown_tx, shutdown_rx) = oneshot::channel();
28
29 let handle = ShutdownHandle { shutdown_tx };
30
31 let process_shutdown = Self {
32 shutdown: Some(Box::pin(async move {
33 let _ = shutdown_rx.await;
34 })),
35 };
36
37 (process_shutdown, handle)
38 }
39
40 /// Creates a new `ProcessShutdown` from the given `future`.
41 ///
42 /// `ProcessShutdown` will be resolve only once `future` resolves.
43 pub fn wrapped<F: Future + Send + 'static>(future: F) -> Self {
44 Self {
45 shutdown: Some(Box::pin(async move {
46 future.await;
47 })),
48 }
49 }
50
51 /// Creates a new `ProcessShutdown` that never resolves.
52 ///
53 /// This is useful for cases where a `ProcessShutdown` is required, but no shutdown signal is expected.
54 pub fn noop() -> Self {
55 Self {
56 shutdown: Some(Box::pin(async { pending().await })),
57 }
58 }
59
60 /// Waits for the shutdown signal to be received.
61 ///
62 /// If the shutdown signal has been received during a previous call to this function, this function will return
63 /// immediately for all subsequent calls.
64 ///
65 /// `ProcessShutdown` also implements [`Future`] directly, which can be useful when passing it to APIs
66 /// that accept a generic future (e.g., as a shutdown signal parameter).
67 pub async fn wait_for_shutdown(&mut self) {
68 if let Some(shutdown_rx) = self.shutdown.take() {
69 let _ = shutdown_rx.await;
70 }
71 }
72}
73
74/// Implements [`Future`] for direct use in `select!` or as a generic shutdown signal.
75///
76/// This is equivalent to calling [`ProcessShutdown::wait_for_shutdown`] — once the shutdown signal is received
77/// (or has been received previously), the future resolves immediately.
78impl Future for ProcessShutdown {
79 type Output = ();
80
81 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82 if let Some(mut shutdown_rx) = self.shutdown.take() {
83 match shutdown_rx.poll_unpin(cx) {
84 Poll::Pending => {
85 self.shutdown = Some(shutdown_rx);
86 Poll::Pending
87 }
88 Poll::Ready(()) => Poll::Ready(()),
89 }
90 } else {
91 Poll::Ready(())
92 }
93 }
94}
95
96impl ShutdownHandle {
97 /// Triggers the process to shutdown.
98 pub fn trigger(self) {
99 let _ = self.shutdown_tx.send(());
100 }
101}