saluki_core/runtime/
dedicated.rs1use std::{
4 future::Future,
5 io,
6 pin::Pin,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11 task::{ready, Context, Poll},
12 thread::JoinHandle,
13};
14
15use saluki_error::{generic_error, GenericError};
16use tokio::sync::oneshot;
17
18use super::{shutdown::ProcessShutdown, supervisor::Supervisor};
19
20#[derive(Clone, Debug)]
22pub struct RuntimeConfiguration {
23 worker_threads: usize,
25}
26
27impl RuntimeConfiguration {
28 pub const fn single_threaded() -> Self {
32 Self { worker_threads: 1 }
33 }
34
35 pub const fn multi_threaded(worker_threads: usize) -> Self {
39 Self { worker_threads }
40 }
41
42 pub(crate) fn build(&self, supervisor_id: &str) -> io::Result<tokio::runtime::Runtime> {
44 let supervisor_id = supervisor_id.to_string();
45 let thread_id = Arc::new(AtomicUsize::new(0));
46
47 if self.worker_threads == 1 {
48 tokio::runtime::Builder::new_current_thread().enable_all().build()
49 } else {
50 tokio::runtime::Builder::new_multi_thread()
51 .enable_all()
52 .enable_alt_timer()
53 .worker_threads(self.worker_threads)
54 .thread_name_fn(move || {
55 let new_thread_id = thread_id.fetch_add(1, Ordering::SeqCst);
56 format!("{}-sup-{:02}", supervisor_id, new_thread_id)
57 })
58 .build()
59 }
60 }
61}
62
63#[derive(Clone, Debug, Default)]
65pub enum RuntimeMode {
66 #[default]
70 Ambient,
71
72 Dedicated(RuntimeConfiguration),
77}
78
79pub(crate) struct DedicatedRuntimeHandle {
83 init_rx: Option<oneshot::Receiver<Result<(), GenericError>>>,
84 result_rx: oneshot::Receiver<Result<(), GenericError>>,
85 thread_handle: Option<JoinHandle<()>>,
86}
87
88impl Future for DedicatedRuntimeHandle {
89 type Output = Result<(), GenericError>;
90
91 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 if let Some(init_rx) = self.init_rx.as_mut() {
94 let init_result = ready!(Pin::new(init_rx).poll(cx));
95 let maybe_init_error = match init_result {
96 Ok(Ok(())) => None,
97 Ok(Err(e)) => Some(e),
98 Err(_) => Some(generic_error!(
99 "no initialization result received; runtime creation likely panicked"
100 )),
101 };
102
103 self.init_rx = None;
104
105 if let Some(error) = maybe_init_error {
106 if let Some(handle) = self.thread_handle.take() {
108 let _ = handle.join();
109 }
110
111 return Poll::Ready(Err(error));
112 }
113 }
114
115 let result = ready!(Pin::new(&mut self.result_rx).poll(cx))
117 .map_err(|_| generic_error!("no supervisor result received; supervisor likely panicked"))?;
118
119 if let Some(handle) = self.thread_handle.take() {
121 let _ = handle.join();
122 }
123
124 Poll::Ready(result)
125 }
126}
127
128pub(crate) fn spawn_dedicated_runtime(
141 mut supervisor: Supervisor, config: RuntimeConfiguration, process_shutdown: ProcessShutdown,
142) -> Result<DedicatedRuntimeHandle, GenericError> {
143 let (init_tx, init_rx) = oneshot::channel();
144 let (result_tx, result_rx) = oneshot::channel();
145
146 let thread_name = format!("{}-sup-rt", supervisor.id());
147 let thread_handle = std::thread::Builder::new()
148 .name(thread_name.clone())
149 .spawn(move || {
150 let runtime = match config.build(supervisor.id()) {
152 Ok(rt) => rt,
153 Err(e) => {
154 let _ = init_tx.send(Err(generic_error!("Failed to build dedicated runtime: {}", e)));
155 return;
156 }
157 };
158
159 if init_tx.send(Ok(())).is_err() {
161 return;
163 }
164
165 let result = runtime.block_on(supervisor.run_with_process_shutdown(process_shutdown));
169 let _ = result_tx.send(result.map_err(|e| e.into()));
170 })
171 .map_err(|e| generic_error!("Failed to spawn dedicated runtime thread '{}': {}", thread_name, e))?;
172
173 Ok(DedicatedRuntimeHandle {
174 init_rx: Some(init_rx),
175 result_rx,
176 thread_handle: Some(thread_handle),
177 })
178}