Skip to main content

saluki_core/runtime/
dedicated.rs

1//! Dedicated runtime support for supervisors.
2
3use std::{
4    future::Future,
5    io,
6    pin::Pin,
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11    task::{ready, Context, Poll},
12    thread::JoinHandle,
13};
14
15use saluki_error::{generic_error, GenericError};
16use tokio::sync::oneshot;
17
18use super::{
19    shutdown::ProcessShutdown,
20    state::DataspaceRegistry,
21    supervisor::{Supervisor, SupervisorError},
22};
23
24/// Configuration for a dedicated Tokio runtime.
25#[derive(Clone, Debug)]
26pub struct RuntimeConfiguration {
27    /// Number of worker threads.
28    worker_threads: usize,
29}
30
31impl RuntimeConfiguration {
32    /// Creates a new single-threaded `RuntimeConfiguration`.
33    ///
34    /// The underlying executor implementation is based on `tokio`'s "current thread" runtime.
35    pub const fn single_threaded() -> Self {
36        Self { worker_threads: 1 }
37    }
38
39    /// Creates a new multi-threaded `RuntimeConfiguration` with the given number of worker threads.
40    ///
41    /// The underlying executor implementation is based on `tokio`'s multi-threaded runtime.
42    pub const fn multi_threaded(worker_threads: usize) -> Self {
43        Self { worker_threads }
44    }
45
46    /// Builds the Tokio runtime from this configuration.
47    pub(crate) fn build(&self, supervisor_id: &str) -> io::Result<tokio::runtime::Runtime> {
48        let supervisor_id = supervisor_id.to_string();
49        let thread_id = Arc::new(AtomicUsize::new(0));
50
51        if self.worker_threads == 1 {
52            tokio::runtime::Builder::new_current_thread().enable_all().build()
53        } else {
54            tokio::runtime::Builder::new_multi_thread()
55                .enable_all()
56                .enable_alt_timer()
57                .worker_threads(self.worker_threads)
58                .thread_name_fn(move || {
59                    let new_thread_id = thread_id.fetch_add(1, Ordering::SeqCst);
60                    format!("{}-sup-{:02}", supervisor_id, new_thread_id)
61                })
62                .build()
63        }
64    }
65}
66
67/// Controls which runtime a supervisor runs on.
68#[derive(Clone, Debug, Default)]
69pub enum RuntimeMode {
70    /// Run on the ambient runtime (default).
71    ///
72    /// The supervisor runs on whatever Tokio runtime is currently active when it is spawned.
73    #[default]
74    Ambient,
75
76    /// Run on a dedicated runtime with the given configuration.
77    ///
78    /// The supervisor spawns its own OS thread(s) and Tokio runtime, providing runtime isolation from the parent
79    /// supervisor.
80    Dedicated(RuntimeConfiguration),
81}
82
83/// A handle to a supervisor running in a dedicated runtime.
84///
85/// Allows capturing any runtime initialization failures as well as the result of the supervisor's execution.
86pub(crate) struct DedicatedRuntimeHandle {
87    supervisor_id: String,
88    init_rx: Option<oneshot::Receiver<Result<(), GenericError>>>,
89    result_rx: oneshot::Receiver<Result<(), SupervisorError>>,
90    thread_handle: Option<JoinHandle<()>>,
91}
92
93impl Future for DedicatedRuntimeHandle {
94    type Output = Result<(), SupervisorError>;
95
96    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97        // First, check if initialization is still pending.
98        //
99        // NOTE: This is runtime-level initialization (building the Tokio runtime and OS thread), not
100        // supervisor child initialization. These errors are always fatal and non-restartable.
101        if let Some(init_rx) = self.init_rx.as_mut() {
102            let init_result = ready!(Pin::new(init_rx).poll(cx));
103            let maybe_init_error = match init_result {
104                Ok(Ok(())) => None,
105                Ok(Err(e)) => Some(e),
106                Err(_) => Some(generic_error!(
107                    "no initialization result received; runtime creation likely panicked"
108                )),
109            };
110
111            self.init_rx = None;
112
113            if let Some(error) = maybe_init_error {
114                // Join on the thread to clean up.
115                if let Some(handle) = self.thread_handle.take() {
116                    let _ = handle.join();
117                }
118
119                return Poll::Ready(Err(SupervisorError::FailedToInitialize {
120                    child_name: self.supervisor_id.clone(),
121                    source: error.into(),
122                }));
123            }
124        }
125
126        // Check for a final result from the supervisor. The structured `SupervisorError` is preserved
127        // so the parent can distinguish init failures from runtime failures.
128        //
129        // If the channel is closed without a result, the runtime thread panicked or exited after
130        // successful init — this is a runtime failure, not an initialization failure.
131        let result = ready!(Pin::new(&mut self.result_rx).poll(cx)).unwrap_or_else(|_| Err(SupervisorError::Shutdown));
132
133        // Join on the thread to clean up.
134        if let Some(handle) = self.thread_handle.take() {
135            let _ = handle.join();
136        }
137
138        Poll::Ready(result)
139    }
140}
141
142/// Spawns a supervisor in a dedicated runtime on a new OS thread, returning a handle to await for the result.
143///
144/// The returned handle will resolve if initialization of the runtime failed, or after the supervisor completes,
145/// whether due to shutdown or failure.
146///
147/// A background OS thread is spawned to run the supervisor, named after the supervisor's ID: `<id>-sup-rt`. For
148/// multi-threaded runtimes, additional threads will be spawned following a similar naming convention:
149/// `<id>-sup-[0-9]+`.
150///
151/// # Errors
152///
153/// If the OS thread cannot be spawned, an error is returned.
154pub(crate) fn spawn_dedicated_runtime(
155    mut supervisor: Supervisor, config: RuntimeConfiguration, process_shutdown: ProcessShutdown,
156    dataspace: DataspaceRegistry,
157) -> Result<DedicatedRuntimeHandle, GenericError> {
158    let (init_tx, init_rx) = oneshot::channel();
159    let (result_tx, result_rx) = oneshot::channel();
160
161    let supervisor_id = supervisor.id().to_string();
162    let thread_name = format!("{}-sup-rt", supervisor_id);
163    let thread_handle = std::thread::Builder::new()
164        .name(thread_name.clone())
165        .spawn(move || {
166            // Build the runtime.
167            let runtime = match config.build(supervisor.id()) {
168                Ok(rt) => rt,
169                Err(e) => {
170                    let _ = init_tx.send(Err(generic_error!("Failed to build dedicated runtime: {}", e)));
171                    return;
172                }
173            };
174
175            // Signal that initialization succeeded.
176            if init_tx.send(Ok(())).is_err() {
177                // Parent is no longer listening, bail out.
178                return;
179            }
180
181            // Run the supervisor to completion and send the result back.
182            //
183            // We pass the parent's dataspace so the nested supervisor inherits it across the
184            // thread boundary rather than creating a new one.
185            let result = runtime.block_on(supervisor.run_with_process_shutdown(process_shutdown, Some(dataspace)));
186            let _ = result_tx.send(result);
187        })
188        .map_err(|e| generic_error!("Failed to spawn dedicated runtime thread '{}': {}", thread_name, e))?;
189
190    Ok(DedicatedRuntimeHandle {
191        supervisor_id,
192        init_rx: Some(init_rx),
193        result_rx,
194        thread_handle: Some(thread_handle),
195    })
196}