Skip to main content

saluki_core/runtime/
shutdown.rs

1use std::{
2    future::{pending, Future},
3    pin::Pin,
4};
5
6use tokio::sync::oneshot;
7
8/// A shutdown signal for a process.
9///
10/// This struct can be used to wait for a shutdown signal from the supervisor to which the process belongs.
11pub struct ProcessShutdown {
12    shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
13}
14
15/// A handle to trigger process shutdown.
16pub struct ShutdownHandle {
17    shutdown_tx: oneshot::Sender<()>,
18}
19
20impl ProcessShutdown {
21    /// Creates a new `ProcessShutdown` and `ShutdownHandle` pair.
22    ///
23    /// When `ShutdownHandle` is triggered, or dropped, `ProcessShutdown` will resolve.
24    pub fn paired() -> (Self, ShutdownHandle) {
25        let (shutdown_tx, shutdown_rx) = oneshot::channel();
26
27        let handle = ShutdownHandle { shutdown_tx };
28
29        let process_shutdown = Self {
30            shutdown: Some(Box::pin(async move {
31                let _ = shutdown_rx.await;
32            })),
33        };
34
35        (process_shutdown, handle)
36    }
37
38    /// Creates a new `ProcessShutdown` from the given `future`.
39    ///
40    /// `ProcessShutdown` will be resolve only once `future` resolves.
41    pub fn wrapped<F: Future + Send + 'static>(future: F) -> Self {
42        Self {
43            shutdown: Some(Box::pin(async move {
44                future.await;
45            })),
46        }
47    }
48
49    /// Creates a new `ProcessShutdown` that never resolves.
50    ///
51    /// This is useful for cases where a `ProcessShutdown` is required, but no shutdown signal is expected.
52    pub fn noop() -> Self {
53        Self {
54            shutdown: Some(Box::pin(async { pending().await })),
55        }
56    }
57
58    /// Waits for the shutdown signal to be received.
59    ///
60    /// If the shutdown signal has been received during a previous call to this function, this function will return
61    /// immediately for all subsequent calls.
62    pub async fn wait_for_shutdown(&mut self) {
63        if let Some(shutdown_rx) = self.shutdown.take() {
64            let _ = shutdown_rx.await;
65        }
66    }
67}
68
69impl ShutdownHandle {
70    /// Triggers the process to shutdown.
71    pub fn trigger(self) {
72        let _ = self.shutdown_tx.send(());
73    }
74}