Skip to main content

saluki_core/runtime/
process.rs

1use std::{
2    future::Future,
3    ops::Deref,
4    pin::Pin,
5    sync::{
6        atomic::{AtomicUsize, Ordering::Relaxed},
7        Arc,
8    },
9    task::{Context, Poll},
10};
11
12use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken, Track as _, Tracked};
13use pin_project::{pin_project, pinned_drop};
14use tracing::{debug_span, instrument::Instrumented, Instrument as _};
15
16use super::state::{DataspaceRegistry, CURRENT_DATASPACE};
17
18static GLOBAL_PROCESS_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);
19
20/// Process identifier.
21///
22/// A simple, numeric identifier that uniquely identifies a process.
23///
24/// Guaranteed to be unique.
25#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
26pub struct Id(usize);
27
28tokio::task_local! {
29    pub(crate) static CURRENT_PROCESS_ID: Id;
30}
31
32impl Id {
33    /// The root process identifier, representing the global/unnamed process context.
34    pub const ROOT: Self = Self(0);
35
36    /// Creates a new process identifier.
37    pub fn new() -> Self {
38        let id = GLOBAL_PROCESS_ID_COUNTER.fetch_add(1, Relaxed);
39        Self(id)
40    }
41
42    /// Returns the process identifier for the currently executing process.
43    ///
44    /// If called outside of a process context, returns `Id::ROOT`.
45    pub fn current() -> Self {
46        CURRENT_PROCESS_ID.try_with(|id| *id).unwrap_or(Self::ROOT)
47    }
48
49    /// Returns the raw numeric value of this identifier.
50    pub fn as_usize(&self) -> usize {
51        self.0
52    }
53}
54
55/// Process name.
56///
57/// A human-readable name for a process that only contains alphanumeric characters, underscores, and periods.
58///
59/// Process names are scoped, such that the resulting process name is nested. For example, if a supervisor has a name of
60/// `topology_sup`, and a child process is added to that supervisor with a name of `worker`, the resulting process name
61/// for the child process will be `topology_sup.worker`. Process names can be arbitrarily nested in this way.
62///
63/// Process names will be sanitized if they contain invalid characters, such as hyphens or spaces. Invalid characters
64/// will be replaced with underscores.
65///
66/// Not guaranteed to be unique.
67#[derive(Clone, Debug, PartialEq, Eq, Hash)]
68pub struct Name(Arc<str>);
69
70impl Name {
71    pub(crate) fn root<N: AsRef<str>>(name: N) -> Option<Self> {
72        let name = name.as_ref();
73        if name.is_empty() {
74            return None;
75        }
76
77        Some(Self(get_sanitized_name(name)))
78    }
79
80    pub(crate) fn scoped<N: AsRef<str>>(parent: &Name, name: N) -> Option<Self> {
81        let name = name.as_ref();
82        if name.is_empty() {
83            return None;
84        }
85
86        let sanitized_name = get_sanitized_name(name);
87        let scoped_name = format!("{}.{}", parent.0, sanitized_name);
88        Some(Self(scoped_name.into()))
89    }
90}
91
92impl Deref for Name {
93    type Target = str;
94
95    fn deref(&self) -> &Self::Target {
96        &self.0
97    }
98}
99
100/// A runtime process.
101#[derive(Clone)]
102pub struct Process {
103    id: Id,
104    name: Name,
105    alloc_group_token: AllocationGroupToken,
106    dataspace: DataspaceRegistry,
107}
108
109impl Process {
110    pub(crate) fn supervisor<N: AsRef<str>>(name: N, parent: Option<&Process>) -> Option<Self> {
111        let name = parent
112            .and_then(|p| Name::scoped(&p.name, &name))
113            .or_else(|| Name::root(name))?;
114        let alloc_group_token = AllocationGroupRegistry::global().register_allocation_group(&*name);
115        let dataspace = parent.map(|p| p.dataspace.clone()).unwrap_or_default();
116        Some(Self::from_parts(Id::new(), name, alloc_group_token, dataspace))
117    }
118
119    pub(crate) fn supervisor_with_dataspace<N: AsRef<str>>(
120        name: N, parent: Option<&Process>, dataspace: Option<DataspaceRegistry>,
121    ) -> Option<Self> {
122        let name = parent
123            .and_then(|p| Name::scoped(&p.name, &name))
124            .or_else(|| Name::root(name))?;
125        let alloc_group_token = AllocationGroupRegistry::global().register_allocation_group(&*name);
126        let dataspace = dataspace
127            .or_else(|| parent.map(|p| p.dataspace.clone()))
128            .unwrap_or_default();
129        Some(Self::from_parts(Id::new(), name, alloc_group_token, dataspace))
130    }
131
132    pub(crate) fn worker<N: AsRef<str>>(name: N, parent: &Process) -> Option<Self> {
133        let name = Name::scoped(&parent.name, name)?;
134        Some(Self::from_parts(
135            Id::new(),
136            name,
137            parent.alloc_group_token,
138            parent.dataspace.clone(),
139        ))
140    }
141
142    fn from_parts(id: Id, name: Name, alloc_group_token: AllocationGroupToken, dataspace: DataspaceRegistry) -> Self {
143        Self {
144            id,
145            name,
146            alloc_group_token,
147            dataspace,
148        }
149    }
150
151    /// Returns the process identifier.
152    pub fn id(&self) -> &Id {
153        &self.id
154    }
155
156    /// Returns the dataspace registry associated with this process.
157    pub(crate) fn dataspace(&self) -> &DataspaceRegistry {
158        &self.dataspace
159    }
160
161    pub fn into_process_future<F>(self, inner: F) -> ProcessFuture<F>
162    where
163        F: Future,
164    {
165        ProcessFuture::new(self, inner)
166    }
167}
168
169/// A process future.
170///
171/// Wraps a [`Future`] with process-specific instrumentation and globals. This ensures that processes have properly scoped tracing and
172/// allocation tracking behavior, as well as access to supervisor/runtime-specific globals, such as the dataspace registry.
173#[pin_project(PinnedDrop)]
174pub struct ProcessFuture<F> {
175    process_id: Id,
176    dataspace: DataspaceRegistry,
177    #[pin]
178    inner: Instrumented<Tracked<F>>,
179}
180
181impl<F> ProcessFuture<F>
182where
183    F: Future,
184{
185    pub(crate) fn new(process: Process, inner: F) -> Self {
186        let span = debug_span!(
187            "process",
188            process_id = process.id().as_usize(),
189            process_name = &*process.name,
190        );
191
192        let process_id = process.id;
193        let dataspace = process.dataspace.clone();
194        let inner = inner.track_allocations(process.alloc_group_token).instrument(span);
195
196        Self {
197            process_id,
198            dataspace,
199            inner,
200        }
201    }
202}
203
204impl<F> Future for ProcessFuture<F>
205where
206    F: Future,
207{
208    type Output = F::Output;
209
210    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211        let this = self.project();
212        CURRENT_PROCESS_ID.sync_scope(*this.process_id, || {
213            CURRENT_DATASPACE.sync_scope(this.dataspace.clone(), || this.inner.poll(cx))
214        })
215    }
216}
217
218#[pinned_drop]
219impl<F> PinnedDrop for ProcessFuture<F> {
220    fn drop(self: Pin<&mut Self>) {
221        let this = self.project();
222        this.dataspace.retract_all_for_process(*this.process_id);
223    }
224}
225
226/// Helper trait for running process futures.
227pub trait ProcessExt {
228    /// Converts the future into a process future.
229    fn into_process_future(self, process: Process) -> ProcessFuture<Self>
230    where
231        Self: Future + Sized;
232}
233
234impl<F> ProcessExt for F
235where
236    F: Future,
237{
238    fn into_process_future(self, process: Process) -> ProcessFuture<Self>
239    where
240        Self: Future + Sized,
241    {
242        process.into_process_future(self)
243    }
244}
245
246fn is_process_name_segment_valid(name: &str) -> bool {
247    // Process name cannot be empty.
248    if name.is_empty() {
249        return false;
250    }
251
252    // Process names cannot start or end with anything other than alphanumeric characters.
253    if !name.starts_with(|c: char| c.is_alphanumeric()) || !name.ends_with(|c: char| c.is_alphanumeric()) {
254        return false;
255    }
256
257    // Process name segments can only include alphanumeric characters and underscores.
258    //
259    // Periods are allowed in process names overall, but they're only used as separators between segments.
260    for c in name.chars() {
261        if !c.is_alphanumeric() && c != '_' {
262            return false;
263        }
264    }
265
266    true
267}
268
269fn get_sanitized_name(name: &str) -> Arc<str> {
270    if is_process_name_segment_valid(name) {
271        name.into()
272    } else {
273        // Replace invalid characters with underscores, and collapses multiple underscores into a single one.
274        let raw_sanitized = name
275            .chars()
276            .map(|c| if c.is_alphanumeric() || c == '_' { c } else { '_' });
277        let mut sanitized = String::with_capacity(name.len());
278
279        let mut last_was_underscore = true;
280        for c in raw_sanitized {
281            if c == '_' {
282                if !last_was_underscore {
283                    sanitized.push(c);
284                    last_was_underscore = true;
285                }
286            } else {
287                sanitized.push(c);
288                last_was_underscore = false;
289            }
290        }
291
292        // Remove all non-alphanumeric characters from beginning and end.
293        let trimmed = sanitized.trim_matches(|c: char| !c.is_alphanumeric());
294        Arc::from(trimmed)
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    #[test]
303    fn test_process_name_root() {
304        let cases = [
305            ("topology_sup", Some("topology_sup")),
306            ("worker", Some("worker")),
307            ("worker.", Some("worker")),
308            ("_worker_", Some("worker")),
309            ("worker-123", Some("worker_123")),
310            ("--worker_123", Some("worker_123")),
311            ("worker 123", Some("worker_123")),
312            ("worker===123", Some("worker_123")),
313            ("topology.worker", Some("topology_worker")),
314            ("", None),
315        ];
316
317        for (input, expected) in cases {
318            let name = Name::root(input);
319            assert_eq!(name.as_deref(), expected);
320        }
321    }
322
323    #[test]
324    fn test_process_name_scoped() {
325        let parent = Name::root("topology_sup").unwrap();
326        let cases = [
327            ("worker", Some("topology_sup.worker")),
328            ("worker.", Some("topology_sup.worker")),
329            ("_worker_", Some("topology_sup.worker")),
330            ("worker-123", Some("topology_sup.worker_123")),
331            ("--worker_123", Some("topology_sup.worker_123")),
332            ("worker 123", Some("topology_sup.worker_123")),
333            ("worker===123", Some("topology_sup.worker_123")),
334            ("nested.worker", Some("topology_sup.nested_worker")),
335            ("", None),
336        ];
337
338        for (input, expected) in cases {
339            let name = Name::scoped(&parent, input);
340            assert_eq!(name.as_deref(), expected);
341        }
342    }
343}