saluki_core/runtime/
dedicated.rs1use std::{
4 future::Future,
5 io,
6 pin::Pin,
7 sync::{
8 atomic::{AtomicUsize, Ordering},
9 Arc,
10 },
11 task::{ready, Context, Poll},
12 thread::JoinHandle,
13};
14
15use saluki_error::{generic_error, GenericError};
16use tokio::sync::oneshot;
17
18use super::{
19 shutdown::ProcessShutdown,
20 state::DataspaceRegistry,
21 supervisor::{Supervisor, SupervisorError},
22};
23
24#[derive(Clone, Debug)]
26pub struct RuntimeConfiguration {
27 worker_threads: usize,
29}
30
31impl RuntimeConfiguration {
32 pub const fn single_threaded() -> Self {
36 Self { worker_threads: 1 }
37 }
38
39 pub const fn multi_threaded(worker_threads: usize) -> Self {
43 Self { worker_threads }
44 }
45
46 pub(crate) fn build(&self, supervisor_id: &str) -> io::Result<tokio::runtime::Runtime> {
48 let supervisor_id = supervisor_id.to_string();
49 let thread_id = Arc::new(AtomicUsize::new(0));
50
51 if self.worker_threads == 1 {
52 tokio::runtime::Builder::new_current_thread().enable_all().build()
53 } else {
54 tokio::runtime::Builder::new_multi_thread()
55 .enable_all()
56 .enable_alt_timer()
57 .worker_threads(self.worker_threads)
58 .thread_name_fn(move || {
59 let new_thread_id = thread_id.fetch_add(1, Ordering::SeqCst);
60 format!("{}-sup-{:02}", supervisor_id, new_thread_id)
61 })
62 .build()
63 }
64 }
65}
66
67#[derive(Clone, Debug, Default)]
69pub enum RuntimeMode {
70 #[default]
74 Ambient,
75
76 Dedicated(RuntimeConfiguration),
81}
82
83pub(crate) struct DedicatedRuntimeHandle {
87 supervisor_id: String,
88 init_rx: Option<oneshot::Receiver<Result<(), GenericError>>>,
89 result_rx: oneshot::Receiver<Result<(), SupervisorError>>,
90 thread_handle: Option<JoinHandle<()>>,
91}
92
93impl Future for DedicatedRuntimeHandle {
94 type Output = Result<(), SupervisorError>;
95
96 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
97 if let Some(init_rx) = self.init_rx.as_mut() {
102 let init_result = ready!(Pin::new(init_rx).poll(cx));
103 let maybe_init_error = match init_result {
104 Ok(Ok(())) => None,
105 Ok(Err(e)) => Some(e),
106 Err(_) => Some(generic_error!(
107 "no initialization result received; runtime creation likely panicked"
108 )),
109 };
110
111 self.init_rx = None;
112
113 if let Some(error) = maybe_init_error {
114 if let Some(handle) = self.thread_handle.take() {
116 let _ = handle.join();
117 }
118
119 return Poll::Ready(Err(SupervisorError::FailedToInitialize {
120 child_name: self.supervisor_id.clone(),
121 source: error.into(),
122 }));
123 }
124 }
125
126 let result = ready!(Pin::new(&mut self.result_rx).poll(cx)).unwrap_or_else(|_| Err(SupervisorError::Shutdown));
132
133 if let Some(handle) = self.thread_handle.take() {
135 let _ = handle.join();
136 }
137
138 Poll::Ready(result)
139 }
140}
141
142pub(crate) fn spawn_dedicated_runtime(
155 mut supervisor: Supervisor, config: RuntimeConfiguration, process_shutdown: ProcessShutdown,
156 dataspace: DataspaceRegistry,
157) -> Result<DedicatedRuntimeHandle, GenericError> {
158 let (init_tx, init_rx) = oneshot::channel();
159 let (result_tx, result_rx) = oneshot::channel();
160
161 let supervisor_id = supervisor.id().to_string();
162 let thread_name = format!("{}-sup-rt", supervisor_id);
163 let thread_handle = std::thread::Builder::new()
164 .name(thread_name.clone())
165 .spawn(move || {
166 let runtime = match config.build(supervisor.id()) {
168 Ok(rt) => rt,
169 Err(e) => {
170 let _ = init_tx.send(Err(generic_error!("Failed to build dedicated runtime: {}", e)));
171 return;
172 }
173 };
174
175 if init_tx.send(Ok(())).is_err() {
177 return;
179 }
180
181 let result = runtime.block_on(supervisor.run_with_process_shutdown(process_shutdown, Some(dataspace)));
186 let _ = result_tx.send(result);
187 })
188 .map_err(|e| generic_error!("Failed to spawn dedicated runtime thread '{}': {}", thread_name, e))?;
189
190 Ok(DedicatedRuntimeHandle {
191 supervisor_id,
192 init_rx: Some(init_rx),
193 result_rx,
194 thread_handle: Some(thread_handle),
195 })
196}