saluki_app/metrics/
mod.rs1use std::time::Duration;
4
5use async_trait::async_trait;
6use metrics::{gauge, Level};
7use saluki_common::sync::shutdown::ShutdownHandle;
8use saluki_core::{
9 observability::metrics::MetricsFlusherWorker,
10 runtime::{InitializationError, Supervisable, SupervisorFuture},
11};
12use saluki_error::GenericError;
13use saluki_metrics::static_metrics;
14use tokio::{runtime::Handle, select, time::sleep};
15
16mod api;
17pub use self::api::{MetricsAPIHandler, MetricsOverrideWorker};
18
19pub(crate) struct MetricsWorkers {
26 pub runtime: RuntimeMetricsWorker,
27 pub flusher: MetricsFlusherWorker,
28 pub override_processor: MetricsOverrideWorker,
29}
30
31pub(crate) async fn initialize_metrics(
44 metrics_prefix: impl Into<String>, default_level: Level,
45) -> Result<MetricsWorkers, GenericError> {
46 let (filter_handle, flusher) =
52 saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into(), default_level).await?;
53
54 let override_processor = MetricsOverrideWorker::new(filter_handle);
55
56 let runtime = RuntimeMetricsWorker::new("primary", Handle::current());
59
60 Ok(MetricsWorkers {
61 runtime,
62 flusher,
63 override_processor,
64 })
65}
66
67pub fn emit_startup_metrics() {
74 let app_details = saluki_metadata::get_app_details();
75 let app_version = if app_details.is_dev_build() {
76 format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
77 } else {
78 app_details.version().raw().to_string()
79 };
80
81 gauge!("running", "version" => app_version).set(1.0);
83}
84
85pub async fn collect_runtime_metrics(runtime_id: &str, handle: Handle) {
90 let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
92
93 loop {
95 let latest_runtime_metrics = handle.metrics();
96 runtime_metrics.update(&latest_runtime_metrics);
97
98 sleep(Duration::from_secs(5)).await;
99 }
100}
101
102pub struct RuntimeMetricsWorker {
107 runtime_id: String,
108 handle: Handle,
109}
110
111impl RuntimeMetricsWorker {
112 pub fn new<S: Into<String>>(runtime_id: S, handle: Handle) -> Self {
114 Self {
115 runtime_id: runtime_id.into(),
116 handle,
117 }
118 }
119}
120
121#[async_trait]
122impl Supervisable for RuntimeMetricsWorker {
123 fn name(&self) -> &str {
124 "tokio-runtime-metrics-collector"
125 }
126
127 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
128 let runtime_id = self.runtime_id.clone();
129 let handle = self.handle.clone();
130
131 Ok(Box::pin(async move {
132 select! {
133 _ = process_shutdown => {},
134 _ = collect_runtime_metrics(&runtime_id, handle) => {},
135 }
136
137 Ok(())
138 }))
139 }
140}
141
142static_metrics!(
143 name => WorkerMetrics,
144 prefix => runtime_worker,
145 labels => [runtime_id: String, worker_idx: String],
146 metrics => [
147 trace_gauge(local_queue_depth),
148 trace_gauge(local_schedule_count),
149 trace_gauge(mean_poll_time),
150 trace_gauge(noop_count),
151 trace_gauge(overflow_count),
152 trace_gauge(park_count),
153 trace_gauge(park_unpark_count),
154 trace_gauge(poll_count),
155 trace_gauge(steal_count),
156 trace_gauge(steal_operations),
157 trace_gauge(total_busy_duration),
158 ]
159);
160
161impl WorkerMetrics {
162 fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
163 Self::new(runtime_id.to_string(), worker_idx.to_string())
164 }
165
166 fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
167 self.local_queue_depth()
168 .set(metrics.worker_local_queue_depth(worker_idx) as f64);
169 self.local_schedule_count()
170 .set(metrics.worker_local_schedule_count(worker_idx) as f64);
171 self.mean_poll_time()
172 .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
173 self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
174 self.overflow_count()
175 .set(metrics.worker_overflow_count(worker_idx) as f64);
176 self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
177 self.park_unpark_count()
178 .set(metrics.worker_park_unpark_count(worker_idx) as f64);
179 self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
180 self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
181 self.steal_operations()
182 .set(metrics.worker_steal_operations(worker_idx) as f64);
183 self.total_busy_duration()
184 .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
185 }
186}
187
188static_metrics!(
189 name => GlobalRuntimeMetrics,
190 prefix => runtime,
191 labels => [runtime_id: String],
192 metrics => [
193 debug_gauge(num_alive_tasks),
194 debug_gauge(blocking_queue_depth),
195 debug_gauge(budget_forced_yield_count),
196 debug_gauge(global_queue_depth),
197 debug_gauge(io_driver_fd_deregistered_count),
198 debug_gauge(io_driver_fd_registered_count),
199 debug_gauge(io_driver_ready_count),
200 debug_gauge(num_blocking_threads),
201 debug_gauge(num_idle_blocking_threads),
202 debug_gauge(num_workers),
203 debug_gauge(remote_schedule_count),
204 debug_gauge(spawned_tasks_count),
205 ],
206);
207
208struct RuntimeMetrics {
209 global: GlobalRuntimeMetrics,
210 workers: Vec<WorkerMetrics>,
211}
212
213impl RuntimeMetrics {
214 fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
215 let mut workers = Vec::with_capacity(workers_len);
216 for i in 0..workers_len {
217 workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
218 }
219
220 Self {
221 global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
222 workers,
223 }
224 }
225
226 fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
227 self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
228 self.global
229 .blocking_queue_depth()
230 .set(metrics.blocking_queue_depth() as f64);
231 self.global
232 .budget_forced_yield_count()
233 .set(metrics.budget_forced_yield_count() as f64);
234 self.global
235 .global_queue_depth()
236 .set(metrics.global_queue_depth() as f64);
237 self.global
238 .io_driver_fd_deregistered_count()
239 .set(metrics.io_driver_fd_deregistered_count() as f64);
240 self.global
241 .io_driver_fd_registered_count()
242 .set(metrics.io_driver_fd_registered_count() as f64);
243 self.global
244 .io_driver_ready_count()
245 .set(metrics.io_driver_ready_count() as f64);
246 self.global
247 .num_blocking_threads()
248 .set(metrics.num_blocking_threads() as f64);
249 self.global
250 .num_idle_blocking_threads()
251 .set(metrics.num_idle_blocking_threads() as f64);
252 self.global.num_workers().set(metrics.num_workers() as f64);
253 self.global
254 .remote_schedule_count()
255 .set(metrics.remote_schedule_count() as f64);
256 self.global
257 .spawned_tasks_count()
258 .set(metrics.spawned_tasks_count() as f64);
259
260 for (worker_idx, worker) in self.workers.iter().enumerate() {
261 worker.update(worker_idx, metrics);
262 }
263 }
264}