Skip to main content

saluki_core/runtime/
shutdown.rs

1use std::{
2    future::{pending, Future},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use futures::FutureExt as _;
8use tokio::sync::oneshot;
9
10/// A shutdown signal for a process.
11///
12/// This struct can be used to wait for a shutdown signal from the supervisor to which the process belongs.
13pub struct ProcessShutdown {
14    shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
15}
16
17/// A handle to trigger process shutdown.
18pub struct ShutdownHandle {
19    shutdown_tx: oneshot::Sender<()>,
20}
21
22impl ProcessShutdown {
23    /// Creates a new `ProcessShutdown` and `ShutdownHandle` pair.
24    ///
25    /// When `ShutdownHandle` is triggered, or dropped, `ProcessShutdown` will resolve.
26    pub fn paired() -> (Self, ShutdownHandle) {
27        let (shutdown_tx, shutdown_rx) = oneshot::channel();
28
29        let handle = ShutdownHandle { shutdown_tx };
30
31        let process_shutdown = Self {
32            shutdown: Some(Box::pin(async move {
33                let _ = shutdown_rx.await;
34            })),
35        };
36
37        (process_shutdown, handle)
38    }
39
40    /// Creates a new `ProcessShutdown` from the given `future`.
41    ///
42    /// `ProcessShutdown` will be resolve only once `future` resolves.
43    pub fn wrapped<F: Future + Send + 'static>(future: F) -> Self {
44        Self {
45            shutdown: Some(Box::pin(async move {
46                future.await;
47            })),
48        }
49    }
50
51    /// Creates a new `ProcessShutdown` that never resolves.
52    ///
53    /// This is useful for cases where a `ProcessShutdown` is required, but no shutdown signal is expected.
54    pub fn noop() -> Self {
55        Self {
56            shutdown: Some(Box::pin(async { pending().await })),
57        }
58    }
59
60    /// Waits for the shutdown signal to be received.
61    ///
62    /// If the shutdown signal has been received during a previous call to this function, this function will return
63    /// immediately for all subsequent calls.
64    ///
65    /// `ProcessShutdown` also implements [`Future`] directly, which can be useful when passing it to APIs
66    /// that accept a generic future (e.g., as a shutdown signal parameter).
67    pub async fn wait_for_shutdown(&mut self) {
68        if let Some(shutdown_rx) = self.shutdown.take() {
69            let _ = shutdown_rx.await;
70        }
71    }
72}
73
74/// Implements [`Future`] for direct use in `select!` or as a generic shutdown signal.
75///
76/// This is equivalent to calling [`ProcessShutdown::wait_for_shutdown`] — once the shutdown signal is received
77/// (or has been received previously), the future resolves immediately.
78impl Future for ProcessShutdown {
79    type Output = ();
80
81    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82        if let Some(mut shutdown_rx) = self.shutdown.take() {
83            match shutdown_rx.poll_unpin(cx) {
84                Poll::Pending => {
85                    self.shutdown = Some(shutdown_rx);
86                    Poll::Pending
87                }
88                Poll::Ready(()) => Poll::Ready(()),
89            }
90        } else {
91            Poll::Ready(())
92        }
93    }
94}
95
96impl ShutdownHandle {
97    /// Triggers the process to shutdown.
98    pub fn trigger(self) {
99        let _ = self.shutdown_tx.send(());
100    }
101}