saluki_core/runtime/shutdown.rs
1use std::{
2 future::{pending, Future},
3 pin::Pin,
4};
5
6use tokio::sync::oneshot;
7
8/// A shutdown signal for a process.
9///
10/// This struct can be used to wait for a shutdown signal from the supervisor to which the process belongs.
11pub struct ProcessShutdown {
12 shutdown: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
13}
14
15/// A handle to trigger process shutdown.
16pub struct ShutdownHandle {
17 shutdown_tx: oneshot::Sender<()>,
18}
19
20impl ProcessShutdown {
21 /// Creates a new `ProcessShutdown` and `ShutdownHandle` pair.
22 ///
23 /// When `ShutdownHandle` is triggered, or dropped, `ProcessShutdown` will resolve.
24 pub fn paired() -> (Self, ShutdownHandle) {
25 let (shutdown_tx, shutdown_rx) = oneshot::channel();
26
27 let handle = ShutdownHandle { shutdown_tx };
28
29 let process_shutdown = Self {
30 shutdown: Some(Box::pin(async move {
31 let _ = shutdown_rx.await;
32 })),
33 };
34
35 (process_shutdown, handle)
36 }
37
38 /// Creates a new `ProcessShutdown` from the given `future`.
39 ///
40 /// `ProcessShutdown` will be resolve only once `future` resolves.
41 pub fn wrapped<F: Future + Send + 'static>(future: F) -> Self {
42 Self {
43 shutdown: Some(Box::pin(async move {
44 future.await;
45 })),
46 }
47 }
48
49 /// Creates a new `ProcessShutdown` that never resolves.
50 ///
51 /// This is useful for cases where a `ProcessShutdown` is required, but no shutdown signal is expected.
52 pub fn noop() -> Self {
53 Self {
54 shutdown: Some(Box::pin(async { pending().await })),
55 }
56 }
57
58 /// Waits for the shutdown signal to be received.
59 ///
60 /// If the shutdown signal has been received during a previous call to this function, this function will return
61 /// immediately for all subsequent calls.
62 pub async fn wait_for_shutdown(&mut self) {
63 if let Some(shutdown_rx) = self.shutdown.take() {
64 let _ = shutdown_rx.await;
65 }
66 }
67}
68
69impl ShutdownHandle {
70 /// Triggers the process to shutdown.
71 pub fn trigger(self) {
72 let _ = self.shutdown_tx.send(());
73 }
74}