saluki_app/metrics/
mod.rs1use std::time::Duration;
4
5use async_trait::async_trait;
6use metrics::{gauge, Level};
7use saluki_core::{
8 observability::metrics::MetricsFlusherWorker,
9 runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
10};
11use saluki_error::GenericError;
12use saluki_metrics::static_metrics;
13use tokio::{runtime::Handle, select, time::sleep};
14
15mod api;
16pub use self::api::{MetricsAPIHandler, MetricsOverrideWorker};
17
18pub(crate) struct MetricsWorkers {
25 pub runtime: RuntimeMetricsWorker,
26 pub flusher: MetricsFlusherWorker,
27 pub override_processor: MetricsOverrideWorker,
28}
29
30pub(crate) async fn initialize_metrics(
43 metrics_prefix: impl Into<String>, default_level: Level,
44) -> Result<MetricsWorkers, GenericError> {
45 let (filter_handle, flusher) =
51 saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into(), default_level).await?;
52
53 let override_processor = MetricsOverrideWorker::new(filter_handle);
54
55 let runtime = RuntimeMetricsWorker::new("primary", Handle::current());
58
59 Ok(MetricsWorkers {
60 runtime,
61 flusher,
62 override_processor,
63 })
64}
65
66pub fn emit_startup_metrics() {
73 let app_details = saluki_metadata::get_app_details();
74 let app_version = if app_details.is_dev_build() {
75 format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
76 } else {
77 app_details.version().raw().to_string()
78 };
79
80 gauge!("running", "version" => app_version).set(1.0);
82}
83
84pub async fn collect_runtime_metrics(runtime_id: &str, handle: Handle) {
89 let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
91
92 loop {
94 let latest_runtime_metrics = handle.metrics();
95 runtime_metrics.update(&latest_runtime_metrics);
96
97 sleep(Duration::from_secs(5)).await;
98 }
99}
100
101pub struct RuntimeMetricsWorker {
106 runtime_id: String,
107 handle: Handle,
108}
109
110impl RuntimeMetricsWorker {
111 pub fn new<S: Into<String>>(runtime_id: S, handle: Handle) -> Self {
113 Self {
114 runtime_id: runtime_id.into(),
115 handle,
116 }
117 }
118}
119
120#[async_trait]
121impl Supervisable for RuntimeMetricsWorker {
122 fn name(&self) -> &str {
123 "tokio-runtime-metrics-collector"
124 }
125
126 async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
127 let runtime_id = self.runtime_id.clone();
128 let handle = self.handle.clone();
129
130 Ok(Box::pin(async move {
131 select! {
132 _ = collect_runtime_metrics(&runtime_id, handle) => {},
133 _ = process_shutdown.wait_for_shutdown() => {},
134 }
135 Ok(())
136 }))
137 }
138}
139
140static_metrics!(
141 name => WorkerMetrics,
142 prefix => runtime_worker,
143 labels => [runtime_id: String, worker_idx: String],
144 metrics => [
145 trace_gauge(local_queue_depth),
146 trace_gauge(local_schedule_count),
147 trace_gauge(mean_poll_time),
148 trace_gauge(noop_count),
149 trace_gauge(overflow_count),
150 trace_gauge(park_count),
151 trace_gauge(park_unpark_count),
152 trace_gauge(poll_count),
153 trace_gauge(steal_count),
154 trace_gauge(steal_operations),
155 trace_gauge(total_busy_duration),
156 ]
157);
158
159impl WorkerMetrics {
160 fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
161 Self::new(runtime_id.to_string(), worker_idx.to_string())
162 }
163
164 fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
165 self.local_queue_depth()
166 .set(metrics.worker_local_queue_depth(worker_idx) as f64);
167 self.local_schedule_count()
168 .set(metrics.worker_local_schedule_count(worker_idx) as f64);
169 self.mean_poll_time()
170 .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
171 self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
172 self.overflow_count()
173 .set(metrics.worker_overflow_count(worker_idx) as f64);
174 self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
175 self.park_unpark_count()
176 .set(metrics.worker_park_unpark_count(worker_idx) as f64);
177 self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
178 self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
179 self.steal_operations()
180 .set(metrics.worker_steal_operations(worker_idx) as f64);
181 self.total_busy_duration()
182 .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
183 }
184}
185
186static_metrics!(
187 name => GlobalRuntimeMetrics,
188 prefix => runtime,
189 labels => [runtime_id: String],
190 metrics => [
191 debug_gauge(num_alive_tasks),
192 debug_gauge(blocking_queue_depth),
193 debug_gauge(budget_forced_yield_count),
194 debug_gauge(global_queue_depth),
195 debug_gauge(io_driver_fd_deregistered_count),
196 debug_gauge(io_driver_fd_registered_count),
197 debug_gauge(io_driver_ready_count),
198 debug_gauge(num_blocking_threads),
199 debug_gauge(num_idle_blocking_threads),
200 debug_gauge(num_workers),
201 debug_gauge(remote_schedule_count),
202 debug_gauge(spawned_tasks_count),
203 ],
204);
205
206struct RuntimeMetrics {
207 global: GlobalRuntimeMetrics,
208 workers: Vec<WorkerMetrics>,
209}
210
211impl RuntimeMetrics {
212 fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
213 let mut workers = Vec::with_capacity(workers_len);
214 for i in 0..workers_len {
215 workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
216 }
217
218 Self {
219 global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
220 workers,
221 }
222 }
223
224 fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
225 self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
226 self.global
227 .blocking_queue_depth()
228 .set(metrics.blocking_queue_depth() as f64);
229 self.global
230 .budget_forced_yield_count()
231 .set(metrics.budget_forced_yield_count() as f64);
232 self.global
233 .global_queue_depth()
234 .set(metrics.global_queue_depth() as f64);
235 self.global
236 .io_driver_fd_deregistered_count()
237 .set(metrics.io_driver_fd_deregistered_count() as f64);
238 self.global
239 .io_driver_fd_registered_count()
240 .set(metrics.io_driver_fd_registered_count() as f64);
241 self.global
242 .io_driver_ready_count()
243 .set(metrics.io_driver_ready_count() as f64);
244 self.global
245 .num_blocking_threads()
246 .set(metrics.num_blocking_threads() as f64);
247 self.global
248 .num_idle_blocking_threads()
249 .set(metrics.num_idle_blocking_threads() as f64);
250 self.global.num_workers().set(metrics.num_workers() as f64);
251 self.global
252 .remote_schedule_count()
253 .set(metrics.remote_schedule_count() as f64);
254 self.global
255 .spawned_tasks_count()
256 .set(metrics.spawned_tasks_count() as f64);
257
258 for (worker_idx, worker) in self.workers.iter().enumerate() {
259 worker.update(worker_idx, metrics);
260 }
261 }
262}