saluki_app/metrics/
mod.rs1use std::{sync::Mutex, time::Duration};
4
5use async_trait::async_trait;
6use metrics::gauge;
7use saluki_core::{
8 observability::metrics::MetricsFlusherWorker,
9 runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
10};
11use saluki_error::GenericError;
12use saluki_metrics::static_metrics;
13use tokio::{runtime::Handle, select, time::sleep};
14
15static API_HANDLER: Mutex<Option<MetricsAPIHandler>> = Mutex::new(None);
16
17mod api;
18pub use self::api::{MetricsAPIHandler, MetricsOverrideWorker};
19
20pub(crate) struct MetricsWorkers {
26 pub runtime: RuntimeMetricsWorker,
27 pub flusher: MetricsFlusherWorker,
28 pub override_processor: MetricsOverrideWorker,
29}
30
31pub(crate) async fn initialize_metrics(metrics_prefix: impl Into<String>) -> Result<MetricsWorkers, GenericError> {
43 let (filter_handle, flusher) =
49 saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into()).await?;
50
51 let (api_handler, override_processor) = MetricsAPIHandler::new(filter_handle);
52 API_HANDLER.lock().unwrap().replace(api_handler);
53
54 let runtime = RuntimeMetricsWorker::new("primary", Handle::current());
57
58 Ok(MetricsWorkers {
59 runtime,
60 flusher,
61 override_processor,
62 })
63}
64
65pub fn acquire_metrics_api_handler() -> Option<MetricsAPIHandler> {
73 API_HANDLER.lock().unwrap().take()
74}
75
76pub fn emit_startup_metrics() {
83 let app_details = saluki_metadata::get_app_details();
84 let app_version = if app_details.is_dev_build() {
85 format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
86 } else {
87 app_details.version().raw().to_string()
88 };
89
90 gauge!("running", "version" => app_version).set(1.0);
92}
93
94pub async fn collect_runtime_metrics(runtime_id: &str, handle: Handle) {
99 let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
101
102 loop {
104 let latest_runtime_metrics = handle.metrics();
105 runtime_metrics.update(&latest_runtime_metrics);
106
107 sleep(Duration::from_secs(5)).await;
108 }
109}
110
111pub struct RuntimeMetricsWorker {
116 runtime_id: String,
117 handle: Handle,
118}
119
120impl RuntimeMetricsWorker {
121 pub fn new<S: Into<String>>(runtime_id: S, handle: Handle) -> Self {
123 Self {
124 runtime_id: runtime_id.into(),
125 handle,
126 }
127 }
128}
129
130#[async_trait]
131impl Supervisable for RuntimeMetricsWorker {
132 fn name(&self) -> &str {
133 "tokio-runtime-metrics-collector"
134 }
135
136 async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
137 let runtime_id = self.runtime_id.clone();
138 let handle = self.handle.clone();
139
140 Ok(Box::pin(async move {
141 select! {
142 _ = collect_runtime_metrics(&runtime_id, handle) => {},
143 _ = process_shutdown.wait_for_shutdown() => {},
144 }
145 Ok(())
146 }))
147 }
148}
149
150static_metrics!(
151 name => WorkerMetrics,
152 prefix => runtime_worker,
153 labels => [runtime_id: String, worker_idx: String],
154 metrics => [
155 trace_gauge(local_queue_depth),
156 trace_gauge(local_schedule_count),
157 trace_gauge(mean_poll_time),
158 trace_gauge(noop_count),
159 trace_gauge(overflow_count),
160 trace_gauge(park_count),
161 trace_gauge(park_unpark_count),
162 trace_gauge(poll_count),
163 trace_gauge(steal_count),
164 trace_gauge(steal_operations),
165 trace_gauge(total_busy_duration),
166 ]
167);
168
169impl WorkerMetrics {
170 fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
171 Self::new(runtime_id.to_string(), worker_idx.to_string())
172 }
173
174 fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
175 self.local_queue_depth()
176 .set(metrics.worker_local_queue_depth(worker_idx) as f64);
177 self.local_schedule_count()
178 .set(metrics.worker_local_schedule_count(worker_idx) as f64);
179 self.mean_poll_time()
180 .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
181 self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
182 self.overflow_count()
183 .set(metrics.worker_overflow_count(worker_idx) as f64);
184 self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
185 self.park_unpark_count()
186 .set(metrics.worker_park_unpark_count(worker_idx) as f64);
187 self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
188 self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
189 self.steal_operations()
190 .set(metrics.worker_steal_operations(worker_idx) as f64);
191 self.total_busy_duration()
192 .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
193 }
194}
195
196static_metrics!(
197 name => GlobalRuntimeMetrics,
198 prefix => runtime,
199 labels => [runtime_id: String],
200 metrics => [
201 gauge(num_alive_tasks),
202 gauge(blocking_queue_depth),
203 gauge(budget_forced_yield_count),
204 gauge(global_queue_depth),
205 gauge(io_driver_fd_deregistered_count),
206 gauge(io_driver_fd_registered_count),
207 gauge(io_driver_ready_count),
208 gauge(num_blocking_threads),
209 gauge(num_idle_blocking_threads),
210 gauge(num_workers),
211 gauge(remote_schedule_count),
212 gauge(spawned_tasks_count),
213 ],
214);
215
216struct RuntimeMetrics {
217 global: GlobalRuntimeMetrics,
218 workers: Vec<WorkerMetrics>,
219}
220
221impl RuntimeMetrics {
222 fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
223 let mut workers = Vec::with_capacity(workers_len);
224 for i in 0..workers_len {
225 workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
226 }
227
228 Self {
229 global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
230 workers,
231 }
232 }
233
234 fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
235 self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
236 self.global
237 .blocking_queue_depth()
238 .set(metrics.blocking_queue_depth() as f64);
239 self.global
240 .budget_forced_yield_count()
241 .set(metrics.budget_forced_yield_count() as f64);
242 self.global
243 .global_queue_depth()
244 .set(metrics.global_queue_depth() as f64);
245 self.global
246 .io_driver_fd_deregistered_count()
247 .set(metrics.io_driver_fd_deregistered_count() as f64);
248 self.global
249 .io_driver_fd_registered_count()
250 .set(metrics.io_driver_fd_registered_count() as f64);
251 self.global
252 .io_driver_ready_count()
253 .set(metrics.io_driver_ready_count() as f64);
254 self.global
255 .num_blocking_threads()
256 .set(metrics.num_blocking_threads() as f64);
257 self.global
258 .num_idle_blocking_threads()
259 .set(metrics.num_idle_blocking_threads() as f64);
260 self.global.num_workers().set(metrics.num_workers() as f64);
261 self.global
262 .remote_schedule_count()
263 .set(metrics.remote_schedule_count() as f64);
264 self.global
265 .spawned_tasks_count()
266 .set(metrics.spawned_tasks_count() as f64);
267
268 for (worker_idx, worker) in self.workers.iter().enumerate() {
269 worker.update(worker_idx, metrics);
270 }
271 }
272}