1use std::{
2 future::Future,
3 ops::Deref,
4 pin::Pin,
5 sync::{
6 atomic::{AtomicUsize, Ordering::Relaxed},
7 Arc,
8 },
9 task::{Context, Poll},
10};
11
12use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken, Track as _, Tracked};
13use pin_project::{pin_project, pinned_drop};
14use tracing::{debug_span, instrument::Instrumented, Instrument as _};
15
16use super::state::{DataspaceRegistry, CURRENT_DATASPACE};
17
18static GLOBAL_PROCESS_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
19
20#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
26pub struct Id(usize);
27
28tokio::task_local! {
29 pub(crate) static CURRENT_PROCESS_ID: Id;
30}
31
32impl Id {
33 pub const ROOT: Self = Self(0);
35
36 pub fn new() -> Self {
38 let id = GLOBAL_PROCESS_ID_COUNTER.fetch_add(1, Relaxed);
39 Self(id)
40 }
41
42 pub fn current() -> Self {
46 CURRENT_PROCESS_ID.try_with(|id| *id).unwrap_or(Self::ROOT)
47 }
48
49 pub fn as_usize(&self) -> usize {
51 self.0
52 }
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, Hash)]
68pub struct Name(Arc<str>);
69
70impl Name {
71 pub(crate) fn root<N: AsRef<str>>(name: N) -> Option<Self> {
72 let name = name.as_ref();
73 if name.is_empty() {
74 return None;
75 }
76
77 Some(Self(get_sanitized_name(name)))
78 }
79
80 pub(crate) fn scoped<N: AsRef<str>>(parent: &Name, name: N) -> Option<Self> {
81 let name = name.as_ref();
82 if name.is_empty() {
83 return None;
84 }
85
86 let sanitized_name = get_sanitized_name(name);
87 let scoped_name = format!("{}.{}", parent.0, sanitized_name);
88 Some(Self(scoped_name.into()))
89 }
90}
91
92impl Deref for Name {
93 type Target = str;
94
95 fn deref(&self) -> &Self::Target {
96 &self.0
97 }
98}
99
100#[derive(Clone)]
102pub struct Process {
103 id: Id,
104 name: Name,
105 alloc_group_token: AllocationGroupToken,
106 dataspace: DataspaceRegistry,
107}
108
109impl Process {
110 pub(crate) fn supervisor<N: AsRef<str>>(name: N, parent: Option<&Process>) -> Option<Self> {
111 let name = parent
112 .and_then(|p| Name::scoped(&p.name, &name))
113 .or_else(|| Name::root(name))?;
114 let alloc_group_token = AllocationGroupRegistry::global().register_allocation_group(&*name);
115 let dataspace = parent.map(|p| p.dataspace.clone()).unwrap_or_default();
116 Some(Self::from_parts(Id::new(), name, alloc_group_token, dataspace))
117 }
118
119 pub(crate) fn supervisor_with_dataspace<N: AsRef<str>>(
120 name: N, parent: Option<&Process>, dataspace: Option<DataspaceRegistry>,
121 ) -> Option<Self> {
122 let name = parent
123 .and_then(|p| Name::scoped(&p.name, &name))
124 .or_else(|| Name::root(name))?;
125 let alloc_group_token = AllocationGroupRegistry::global().register_allocation_group(&*name);
126 let dataspace = dataspace
127 .or_else(|| parent.map(|p| p.dataspace.clone()))
128 .unwrap_or_default();
129 Some(Self::from_parts(Id::new(), name, alloc_group_token, dataspace))
130 }
131
132 pub(crate) fn worker<N: AsRef<str>>(name: N, parent: &Process) -> Option<Self> {
133 let name = Name::scoped(&parent.name, name)?;
134 Some(Self::from_parts(
135 Id::new(),
136 name,
137 parent.alloc_group_token,
138 parent.dataspace.clone(),
139 ))
140 }
141
142 fn from_parts(id: Id, name: Name, alloc_group_token: AllocationGroupToken, dataspace: DataspaceRegistry) -> Self {
143 Self {
144 id,
145 name,
146 alloc_group_token,
147 dataspace,
148 }
149 }
150
151 pub fn id(&self) -> &Id {
153 &self.id
154 }
155
156 pub(crate) fn dataspace(&self) -> &DataspaceRegistry {
158 &self.dataspace
159 }
160
161 pub fn into_process_future<F>(self, inner: F) -> ProcessFuture<F>
162 where
163 F: Future,
164 {
165 ProcessFuture::new(self, inner)
166 }
167}
168
169#[pin_project(PinnedDrop)]
174pub struct ProcessFuture<F> {
175 process_id: Id,
176 dataspace: DataspaceRegistry,
177 #[pin]
178 inner: Instrumented<Tracked<F>>,
179}
180
181impl<F> ProcessFuture<F>
182where
183 F: Future,
184{
185 pub(crate) fn new(process: Process, inner: F) -> Self {
186 let span = debug_span!(
187 "process",
188 process_id = process.id().as_usize(),
189 process_name = &*process.name,
190 );
191
192 let process_id = process.id;
193 let dataspace = process.dataspace.clone();
194 let inner = inner.track_allocations(process.alloc_group_token).instrument(span);
195
196 Self {
197 process_id,
198 dataspace,
199 inner,
200 }
201 }
202}
203
204impl<F> Future for ProcessFuture<F>
205where
206 F: Future,
207{
208 type Output = F::Output;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211 let this = self.project();
212 CURRENT_PROCESS_ID.sync_scope(*this.process_id, || {
213 CURRENT_DATASPACE.sync_scope(this.dataspace.clone(), || this.inner.poll(cx))
214 })
215 }
216}
217
218#[pinned_drop]
219impl<F> PinnedDrop for ProcessFuture<F> {
220 fn drop(self: Pin<&mut Self>) {
221 let this = self.project();
222 this.dataspace.retract_all_for_process(*this.process_id);
223 }
224}
225
226pub trait ProcessExt {
228 fn into_process_future(self, process: Process) -> ProcessFuture<Self>
230 where
231 Self: Future + Sized;
232}
233
234impl<F> ProcessExt for F
235where
236 F: Future,
237{
238 fn into_process_future(self, process: Process) -> ProcessFuture<Self>
239 where
240 Self: Future + Sized,
241 {
242 process.into_process_future(self)
243 }
244}
245
246fn is_process_name_segment_valid(name: &str) -> bool {
247 if name.is_empty() {
249 return false;
250 }
251
252 if !name.starts_with(|c: char| c.is_alphanumeric()) || !name.ends_with(|c: char| c.is_alphanumeric()) {
254 return false;
255 }
256
257 for c in name.chars() {
261 if !c.is_alphanumeric() && c != '_' {
262 return false;
263 }
264 }
265
266 true
267}
268
269fn get_sanitized_name(name: &str) -> Arc<str> {
270 if is_process_name_segment_valid(name) {
271 name.into()
272 } else {
273 let raw_sanitized = name
275 .chars()
276 .map(|c| if c.is_alphanumeric() || c == '_' { c } else { '_' });
277 let mut sanitized = String::with_capacity(name.len());
278
279 let mut last_was_underscore = true;
280 for c in raw_sanitized {
281 if c == '_' {
282 if !last_was_underscore {
283 sanitized.push(c);
284 last_was_underscore = true;
285 }
286 } else {
287 sanitized.push(c);
288 last_was_underscore = false;
289 }
290 }
291
292 let trimmed = sanitized.trim_matches(|c: char| !c.is_alphanumeric());
294 Arc::from(trimmed)
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 #[test]
303 fn test_process_name_root() {
304 let cases = [
305 ("topology_sup", Some("topology_sup")),
306 ("worker", Some("worker")),
307 ("worker.", Some("worker")),
308 ("_worker_", Some("worker")),
309 ("worker-123", Some("worker_123")),
310 ("--worker_123", Some("worker_123")),
311 ("worker 123", Some("worker_123")),
312 ("worker===123", Some("worker_123")),
313 ("topology.worker", Some("topology_worker")),
314 ("", None),
315 ];
316
317 for (input, expected) in cases {
318 let name = Name::root(input);
319 assert_eq!(name.as_deref(), expected);
320 }
321 }
322
323 #[test]
324 fn test_process_name_scoped() {
325 let parent = Name::root("topology_sup").unwrap();
326 let cases = [
327 ("worker", Some("topology_sup.worker")),
328 ("worker.", Some("topology_sup.worker")),
329 ("_worker_", Some("topology_sup.worker")),
330 ("worker-123", Some("topology_sup.worker_123")),
331 ("--worker_123", Some("topology_sup.worker_123")),
332 ("worker 123", Some("topology_sup.worker_123")),
333 ("worker===123", Some("topology_sup.worker_123")),
334 ("nested.worker", Some("topology_sup.nested_worker")),
335 ("", None),
336 ];
337
338 for (input, expected) in cases {
339 let name = Name::scoped(&parent, input);
340 assert_eq!(name.as_deref(), expected);
341 }
342 }
343}