Skip to main content

saluki_core/runtime/
dedicated.rs

1//! Dedicated runtime support for supervisors.
2
3use std::{
4    future::Future,
5    io,
6    pin::Pin,
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11    task::{ready, Context, Poll},
12    thread::JoinHandle,
13};
14
15use saluki_error::{generic_error, GenericError};
16use tokio::sync::oneshot;
17
18use super::{shutdown::ProcessShutdown, supervisor::Supervisor};
19
20/// Configuration for a dedicated Tokio runtime.
21#[derive(Clone, Debug)]
22pub struct RuntimeConfiguration {
23    /// Number of worker threads.
24    worker_threads: usize,
25}
26
27impl RuntimeConfiguration {
28    /// Creates a new single-threaded `RuntimeConfiguration`.
29    ///
30    /// The underlying executor implementation is based on `tokio`'s "current thread" runtime.
31    pub const fn single_threaded() -> Self {
32        Self { worker_threads: 1 }
33    }
34
35    /// Creates a new multi-threaded `RuntimeConfiguration` with the given number of worker threads.
36    ///
37    /// The underlying executor implementation is based on `tokio`'s multi-threaded runtime.
38    pub const fn multi_threaded(worker_threads: usize) -> Self {
39        Self { worker_threads }
40    }
41
42    /// Builds the Tokio runtime from this configuration.
43    pub(crate) fn build(&self, supervisor_id: &str) -> io::Result<tokio::runtime::Runtime> {
44        let supervisor_id = supervisor_id.to_string();
45        let thread_id = Arc::new(AtomicUsize::new(0));
46
47        if self.worker_threads == 1 {
48            tokio::runtime::Builder::new_current_thread().enable_all().build()
49        } else {
50            tokio::runtime::Builder::new_multi_thread()
51                .enable_all()
52                .enable_alt_timer()
53                .worker_threads(self.worker_threads)
54                .thread_name_fn(move || {
55                    let new_thread_id = thread_id.fetch_add(1, Ordering::SeqCst);
56                    format!("{}-sup-{:02}", supervisor_id, new_thread_id)
57                })
58                .build()
59        }
60    }
61}
62
63/// Controls which runtime a supervisor runs on.
64#[derive(Clone, Debug, Default)]
65pub enum RuntimeMode {
66    /// Run on the ambient runtime (default).
67    ///
68    /// The supervisor runs on whatever Tokio runtime is currently active when it is spawned.
69    #[default]
70    Ambient,
71
72    /// Run on a dedicated runtime with the given configuration.
73    ///
74    /// The supervisor spawns its own OS thread(s) and Tokio runtime, providing runtime isolation from the parent
75    /// supervisor.
76    Dedicated(RuntimeConfiguration),
77}
78
79/// A handle to a supervisor running in a dedicated runtime.
80///
81/// Allows capturing any runtime initialization failures as well as the result of the supervisor's execution.
82pub(crate) struct DedicatedRuntimeHandle {
83    init_rx: Option<oneshot::Receiver<Result<(), GenericError>>>,
84    result_rx: oneshot::Receiver<Result<(), GenericError>>,
85    thread_handle: Option<JoinHandle<()>>,
86}
87
88impl Future for DedicatedRuntimeHandle {
89    type Output = Result<(), GenericError>;
90
91    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        // First, check if initialization is still pending.
93        if let Some(init_rx) = self.init_rx.as_mut() {
94            let init_result = ready!(Pin::new(init_rx).poll(cx));
95            let maybe_init_error = match init_result {
96                Ok(Ok(())) => None,
97                Ok(Err(e)) => Some(e),
98                Err(_) => Some(generic_error!(
99                    "no initialization result received; runtime creation likely panicked"
100                )),
101            };
102
103            self.init_rx = None;
104
105            if let Some(error) = maybe_init_error {
106                // Join on the thread to clean up.
107                if let Some(handle) = self.thread_handle.take() {
108                    let _ = handle.join();
109                }
110
111                return Poll::Ready(Err(error));
112            }
113        }
114
115        // Check for a final result from the supervisor.
116        let result = ready!(Pin::new(&mut self.result_rx).poll(cx))
117            .map_err(|_| generic_error!("no supervisor result received; supervisor likely panicked"))?;
118
119        // Join on the thread to clean up.
120        if let Some(handle) = self.thread_handle.take() {
121            let _ = handle.join();
122        }
123
124        Poll::Ready(result)
125    }
126}
127
128/// Spawns a supervisor in a dedicated runtime on a new OS thread, returning a handle to await for the result.
129///
130/// The returned handle will resolve if initialization of the runtime failed, or after the supervisor completes,
131/// whether due to shutdown or failure.
132///
133/// A background OS thread is spawned to run the supervisor, named after the supervisor's ID: `<id>-sup-rt`. For
134/// multi-threaded runtimes, additional threads will be spawned following a similar naming convention:
135/// `<id>-sup-[0-9]+`.
136///
137/// # Errors
138///
139/// If the OS thread cannot be spawned, an error is returned.
140pub(crate) fn spawn_dedicated_runtime(
141    mut supervisor: Supervisor, config: RuntimeConfiguration, process_shutdown: ProcessShutdown,
142) -> Result<DedicatedRuntimeHandle, GenericError> {
143    let (init_tx, init_rx) = oneshot::channel();
144    let (result_tx, result_rx) = oneshot::channel();
145
146    let thread_name = format!("{}-sup-rt", supervisor.id());
147    let thread_handle = std::thread::Builder::new()
148        .name(thread_name.clone())
149        .spawn(move || {
150            // Build the runtime.
151            let runtime = match config.build(supervisor.id()) {
152                Ok(rt) => rt,
153                Err(e) => {
154                    let _ = init_tx.send(Err(generic_error!("Failed to build dedicated runtime: {}", e)));
155                    return;
156                }
157            };
158
159            // Signal that initialization succeeded.
160            if init_tx.send(Ok(())).is_err() {
161                // Parent is no longer listening, bail out.
162                return;
163            }
164
165            // Run the supervisor to completion and send the result back.
166            //
167            // We ignore failures with sending the result because we can't do anything about it anyways.
168            let result = runtime.block_on(supervisor.run_with_process_shutdown(process_shutdown));
169            let _ = result_tx.send(result.map_err(|e| e.into()));
170        })
171        .map_err(|e| generic_error!("Failed to spawn dedicated runtime thread '{}': {}", thread_name, e))?;
172
173    Ok(DedicatedRuntimeHandle {
174        init_rx: Some(init_rx),
175        result_rx,
176        thread_handle: Some(thread_handle),
177    })
178}