saluki_common/task/
mod.rs

1//! Helpers for working with asynchronous tasks.
2
3use std::future::Future;
4
5use memory_accounting::allocator::Track as _;
6use tokio::{
7    runtime::Handle,
8    task::{AbortHandle, JoinHandle, JoinSet},
9};
10use tracing::Instrument as _;
11
12mod instrument;
13use self::instrument::TaskInstrument as _;
14
15/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
16///
17/// This function is a thin wrapper over [`tokio::spawn`], and provides implicit "tracing" for spawned futures by
18/// ensuring that the task is attached to the current `tracing` span and the current allocation component.
19#[track_caller]
20pub fn spawn_traced<F, T>(f: F) -> JoinHandle<T>
21where
22    F: Future<Output = T> + Send + 'static,
23    T: Send + 'static,
24{
25    tokio::task::spawn(
26        f.in_current_span()
27            .in_current_allocation_group()
28            .with_task_instrumentation(get_caller_location_as_string()),
29    )
30}
31
32/// Spawns a new named asynchronous task, returning a [`JoinHandle`] for it.
33///
34/// This function is a thin wrapper over [`tokio::spawn`], and provides implicit "tracing" for spawned futures by
35/// ensuring that the task is attached to the current `tracing` span and the current allocation component.
36pub fn spawn_traced_named<S, F, T>(name: S, f: F) -> JoinHandle<T>
37where
38    S: Into<String>,
39    F: Future<Output = T> + Send + 'static,
40    T: Send + 'static,
41{
42    tokio::task::spawn(
43        f.in_current_span()
44            .in_current_allocation_group()
45            .with_task_instrumentation(name.into()),
46    )
47}
48
49/// Helper trait for providing traced spawning when using `JoinSet<T>`.
50pub trait JoinSetExt<T> {
51    /// Spawns a new asynchronous task, returning an [`AbortHandle`] for it.
52    ///
53    /// This is meant to be a thin wrapper over task management types like [`JoinSet`], and provides implicit "tracing"
54    /// for spawned futures by ensuring that the task is attached to the current `tracing` span and the current
55    /// allocation component.
56    fn spawn_traced<F>(&mut self, f: F) -> AbortHandle
57    where
58        F: Future<Output = T> + Send + 'static,
59        T: Send + 'static;
60
61    /// Spawns a new named asynchronous task, returning an [`AbortHandle`] for it.
62    ///
63    /// This is meant to be a thin wrapper over task management types like [`JoinSet`], and provides implicit "tracing"
64    /// for spawned futures by ensuring that the task is attached to the current `tracing` span and the current
65    /// allocation component.
66    fn spawn_traced_named<S, F>(&mut self, name: S, f: F) -> AbortHandle
67    where
68        S: Into<String>,
69        F: Future<Output = T> + Send + 'static,
70        T: Send + 'static;
71}
72
73impl<T> JoinSetExt<T> for JoinSet<T> {
74    fn spawn_traced<F>(&mut self, f: F) -> AbortHandle
75    where
76        F: Future<Output = T> + Send + 'static,
77        T: Send + 'static,
78    {
79        self.spawn(
80            f.in_current_span()
81                .in_current_allocation_group()
82                .with_task_instrumentation(get_caller_location_as_string()),
83        )
84    }
85
86    fn spawn_traced_named<S, F>(&mut self, name: S, f: F) -> AbortHandle
87    where
88        S: Into<String>,
89        F: Future<Output = T> + Send + 'static,
90        T: Send + 'static,
91    {
92        self.spawn(
93            f.in_current_span()
94                .in_current_allocation_group()
95                .with_task_instrumentation(name.into()),
96        )
97    }
98}
99
100/// Helper trait for providing traced spawning when using `Handle`.
101pub trait HandleExt<T> {
102    /// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
103    ///
104    /// This is meant to be a thin wrapper over task management types like [`Handle`], and provides implicit "tracing"
105    /// for spawned futures by ensuring that the task is attached to the current `tracing` span and the current
106    /// allocation component.
107    fn spawn_traced<F>(&self, f: F) -> JoinHandle<T>
108    where
109        F: Future<Output = T> + Send + 'static,
110        T: Send + 'static;
111
112    /// Spawns a new named asynchronous task, returning a [`JoinHandle`] for it.
113    ///
114    /// This is meant to be a thin wrapper over task management types like [`Handle`], and provides implicit "tracing"
115    /// for spawned futures by ensuring that the task is attached to the current `tracing` span and the current
116    /// allocation component.
117    fn spawn_traced_named<S, F>(&self, name: S, f: F) -> JoinHandle<T>
118    where
119        S: Into<String>,
120        F: Future<Output = T> + Send + 'static,
121        T: Send + 'static;
122}
123
124impl<T> HandleExt<T> for Handle {
125    fn spawn_traced<F>(&self, f: F) -> JoinHandle<T>
126    where
127        F: Future<Output = T> + Send + 'static,
128        T: Send + 'static,
129    {
130        self.spawn(
131            f.in_current_span()
132                .in_current_allocation_group()
133                .with_task_instrumentation(get_caller_location_as_string()),
134        )
135    }
136
137    fn spawn_traced_named<S, F>(&self, name: S, f: F) -> JoinHandle<T>
138    where
139        S: Into<String>,
140        F: Future<Output = T> + Send + 'static,
141        T: Send + 'static,
142    {
143        self.spawn(
144            f.in_current_span()
145                .in_current_allocation_group()
146                .with_task_instrumentation(name.into()),
147        )
148    }
149}
150
151/// Gets the caller location as a string, in the form of `file:line:column`.
152#[track_caller]
153pub fn get_caller_location_as_string() -> String {
154    let caller = std::panic::Location::caller();
155    format!("file-{}@{}-{}", caller.file(), caller.line(), caller.column())
156}