saluki_app/metrics/
mod.rs1use std::{sync::Mutex, time::Duration};
4
5use metrics::gauge;
6use saluki_common::task::spawn_traced_named;
7use saluki_error::GenericError;
8use saluki_metrics::static_metrics;
9use tokio::{runtime::Handle, time::sleep};
10
11static API_HANDLER: Mutex<Option<MetricsAPIHandler>> = Mutex::new(None);
12
13mod api;
14pub use self::api::MetricsAPIHandler;
15
16pub(crate) async fn initialize_metrics(metrics_prefix: impl Into<String>) -> Result<(), GenericError> {
25 let filter_handle = saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into()).await?;
31 API_HANDLER
32 .lock()
33 .unwrap()
34 .replace(MetricsAPIHandler::new(filter_handle));
35
36 spawn_traced_named(
38 "tokio-runtime-metrics-collector-primary",
39 collect_runtime_metrics("primary"),
40 );
41
42 Ok(())
43}
44
45pub fn acquire_metrics_api_handler() -> Option<MetricsAPIHandler> {
53 API_HANDLER.lock().unwrap().take()
54}
55
56pub fn emit_startup_metrics() {
63 let app_details = saluki_metadata::get_app_details();
64 let app_version = if app_details.is_dev_build() {
65 format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
66 } else {
67 app_details.version().raw().to_string()
68 };
69
70 gauge!("running", "version" => app_version).set(1.0);
72}
73
74pub async fn collect_runtime_metrics(runtime_id: &str) {
79 let handle = Handle::current();
83 let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
84
85 loop {
87 let latest_runtime_metrics = handle.metrics();
88 runtime_metrics.update(&latest_runtime_metrics);
89
90 sleep(Duration::from_secs(5)).await;
91 }
92}
93
94static_metrics!(
95 name => WorkerMetrics,
96 prefix => runtime_worker,
97 labels => [runtime_id: String, worker_idx: String],
98 metrics => [
99 trace_gauge(local_queue_depth),
100 trace_gauge(local_schedule_count),
101 trace_gauge(mean_poll_time),
102 trace_gauge(noop_count),
103 trace_gauge(overflow_count),
104 trace_gauge(park_count),
105 trace_gauge(park_unpark_count),
106 trace_gauge(poll_count),
107 trace_gauge(steal_count),
108 trace_gauge(steal_operations),
109 trace_gauge(total_busy_duration),
110 ]
111);
112
113impl WorkerMetrics {
114 fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
115 Self::new(runtime_id.to_string(), worker_idx.to_string())
116 }
117
118 fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
119 self.local_queue_depth()
120 .set(metrics.worker_local_queue_depth(worker_idx) as f64);
121 self.local_schedule_count()
122 .set(metrics.worker_local_schedule_count(worker_idx) as f64);
123 self.mean_poll_time()
124 .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
125 self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
126 self.overflow_count()
127 .set(metrics.worker_overflow_count(worker_idx) as f64);
128 self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
129 self.park_unpark_count()
130 .set(metrics.worker_park_unpark_count(worker_idx) as f64);
131 self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
132 self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
133 self.steal_operations()
134 .set(metrics.worker_steal_operations(worker_idx) as f64);
135 self.total_busy_duration()
136 .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
137 }
138}
139
140static_metrics!(
141 name => GlobalRuntimeMetrics,
142 prefix => runtime,
143 labels => [runtime_id: String],
144 metrics => [
145 gauge(num_alive_tasks),
146 gauge(blocking_queue_depth),
147 gauge(budget_forced_yield_count),
148 gauge(global_queue_depth),
149 gauge(io_driver_fd_deregistered_count),
150 gauge(io_driver_fd_registered_count),
151 gauge(io_driver_ready_count),
152 gauge(num_blocking_threads),
153 gauge(num_idle_blocking_threads),
154 gauge(num_workers),
155 gauge(remote_schedule_count),
156 gauge(spawned_tasks_count),
157 ],
158);
159
160struct RuntimeMetrics {
161 global: GlobalRuntimeMetrics,
162 workers: Vec<WorkerMetrics>,
163}
164
165impl RuntimeMetrics {
166 fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
167 let mut workers = Vec::with_capacity(workers_len);
168 for i in 0..workers_len {
169 workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
170 }
171
172 Self {
173 global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
174 workers,
175 }
176 }
177
178 fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
179 self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
180 self.global
181 .blocking_queue_depth()
182 .set(metrics.blocking_queue_depth() as f64);
183 self.global
184 .budget_forced_yield_count()
185 .set(metrics.budget_forced_yield_count() as f64);
186 self.global
187 .global_queue_depth()
188 .set(metrics.global_queue_depth() as f64);
189 self.global
190 .io_driver_fd_deregistered_count()
191 .set(metrics.io_driver_fd_deregistered_count() as f64);
192 self.global
193 .io_driver_fd_registered_count()
194 .set(metrics.io_driver_fd_registered_count() as f64);
195 self.global
196 .io_driver_ready_count()
197 .set(metrics.io_driver_ready_count() as f64);
198 self.global
199 .num_blocking_threads()
200 .set(metrics.num_blocking_threads() as f64);
201 self.global
202 .num_idle_blocking_threads()
203 .set(metrics.num_idle_blocking_threads() as f64);
204 self.global.num_workers().set(metrics.num_workers() as f64);
205 self.global
206 .remote_schedule_count()
207 .set(metrics.remote_schedule_count() as f64);
208 self.global
209 .spawned_tasks_count()
210 .set(metrics.spawned_tasks_count() as f64);
211
212 for (worker_idx, worker) in self.workers.iter().enumerate() {
213 worker.update(worker_idx, metrics);
214 }
215 }
216}