saluki_app/metrics/
mod.rs

1//! Metrics.
2
3use std::{sync::Mutex, time::Duration};
4
5use metrics::gauge;
6use saluki_common::task::spawn_traced_named;
7use saluki_error::GenericError;
8use saluki_metrics::static_metrics;
9use tokio::{runtime::Handle, time::sleep};
10
11static API_HANDLER: Mutex<Option<MetricsAPIHandler>> = Mutex::new(None);
12
13mod api;
14pub use self::api::MetricsAPIHandler;
15
16/// Initializes the metrics subsystem for `metrics`.
17///
18/// The given prefix is used to namespace all metrics that are emitted by the application, and is prepended to all
19/// metrics, followed by a period (e.g. `<prefix>.<metric name>`).
20///
21/// # Errors
22///
23/// If the metrics subsystem was already initialized, an error will be returned.
24pub(crate) async fn initialize_metrics(metrics_prefix: impl Into<String>) -> Result<(), GenericError> {
25    // We forward to the implementation in `saluki_core` so that we can have this crate be the collection point of all
26    // helpers/types that are specific to generic application setup/initialization.
27    //
28    // The implementation itself has to live in `saluki_core`, however, to have access to all of the underlying types
29    // that are created and used to install the global recorder, such that they need not be exposed publicly.
30    let filter_handle = saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into()).await?;
31    API_HANDLER
32        .lock()
33        .unwrap()
34        .replace(MetricsAPIHandler::new(filter_handle));
35
36    // We also spawn a background task that collects and emits the Tokio runtime metrics.
37    spawn_traced_named(
38        "tokio-runtime-metrics-collector-primary",
39        collect_runtime_metrics("primary"),
40    );
41
42    Ok(())
43}
44
45/// Acquires the metrics API handler.
46///
47/// This function is mutable, and consumes the handler if it's present. This means it should only be called once, and
48/// only after metrics have been initialized via `initialize_metrics`.
49///
50/// The metrics API handler can be used to install API routes which allow dynamically controlling the metrics level
51/// filtering. See [`MetricsAPIHandler`] for more information.
52pub fn acquire_metrics_api_handler() -> Option<MetricsAPIHandler> {
53    API_HANDLER.lock().unwrap().take()
54}
55
56/// Emits the startup metrics for the application.
57///
58/// This is generally meant to be called after the application has been initialized, in order to indicate the
59/// application has completed start-up and is now running.
60///
61/// Must be called after the metrics subsystem has been initialized.
62pub fn emit_startup_metrics() {
63    let app_details = saluki_metadata::get_app_details();
64    let app_version = if app_details.is_dev_build() {
65        format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
66    } else {
67        app_details.version().raw().to_string()
68    };
69
70    // Emit a "running" metric to indicate that the application is running.
71    gauge!("running", "version" => app_version).set(1.0);
72}
73
74/// Collects Tokio runtime metrics on the current Tokio runtime.
75///
76/// All metrics generate will include a `runtime_id` label which maps to the given runtime ID. This allows for
77/// differentiating between multiple runtimes that may be running in the same process.
78pub async fn collect_runtime_metrics(runtime_id: &str) {
79    // Get the handle to the runtime we're executing in, and then grab the total number of runtime workers.
80    //
81    // We need the number of workers to properly initialize/register our metrics.
82    let handle = Handle::current();
83    let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
84
85    // With our metrics registered, enter the main loop where we periodically scrape the metrics.
86    loop {
87        let latest_runtime_metrics = handle.metrics();
88        runtime_metrics.update(&latest_runtime_metrics);
89
90        sleep(Duration::from_secs(5)).await;
91    }
92}
93
94static_metrics!(
95    name => WorkerMetrics,
96    prefix => runtime_worker,
97    labels => [runtime_id: String, worker_idx: String],
98    metrics => [
99        trace_gauge(local_queue_depth),
100        trace_gauge(local_schedule_count),
101        trace_gauge(mean_poll_time),
102        trace_gauge(noop_count),
103        trace_gauge(overflow_count),
104        trace_gauge(park_count),
105        trace_gauge(park_unpark_count),
106        trace_gauge(poll_count),
107        trace_gauge(steal_count),
108        trace_gauge(steal_operations),
109        trace_gauge(total_busy_duration),
110    ]
111);
112
113impl WorkerMetrics {
114    fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
115        Self::new(runtime_id.to_string(), worker_idx.to_string())
116    }
117
118    fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
119        self.local_queue_depth()
120            .set(metrics.worker_local_queue_depth(worker_idx) as f64);
121        self.local_schedule_count()
122            .set(metrics.worker_local_schedule_count(worker_idx) as f64);
123        self.mean_poll_time()
124            .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
125        self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
126        self.overflow_count()
127            .set(metrics.worker_overflow_count(worker_idx) as f64);
128        self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
129        self.park_unpark_count()
130            .set(metrics.worker_park_unpark_count(worker_idx) as f64);
131        self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
132        self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
133        self.steal_operations()
134            .set(metrics.worker_steal_operations(worker_idx) as f64);
135        self.total_busy_duration()
136            .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
137    }
138}
139
140static_metrics!(
141    name => GlobalRuntimeMetrics,
142    prefix => runtime,
143    labels => [runtime_id: String],
144    metrics => [
145        gauge(num_alive_tasks),
146        gauge(blocking_queue_depth),
147        gauge(budget_forced_yield_count),
148        gauge(global_queue_depth),
149        gauge(io_driver_fd_deregistered_count),
150        gauge(io_driver_fd_registered_count),
151        gauge(io_driver_ready_count),
152        gauge(num_blocking_threads),
153        gauge(num_idle_blocking_threads),
154        gauge(num_workers),
155        gauge(remote_schedule_count),
156        gauge(spawned_tasks_count),
157    ],
158);
159
160struct RuntimeMetrics {
161    global: GlobalRuntimeMetrics,
162    workers: Vec<WorkerMetrics>,
163}
164
165impl RuntimeMetrics {
166    fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
167        let mut workers = Vec::with_capacity(workers_len);
168        for i in 0..workers_len {
169            workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
170        }
171
172        Self {
173            global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
174            workers,
175        }
176    }
177
178    fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
179        self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
180        self.global
181            .blocking_queue_depth()
182            .set(metrics.blocking_queue_depth() as f64);
183        self.global
184            .budget_forced_yield_count()
185            .set(metrics.budget_forced_yield_count() as f64);
186        self.global
187            .global_queue_depth()
188            .set(metrics.global_queue_depth() as f64);
189        self.global
190            .io_driver_fd_deregistered_count()
191            .set(metrics.io_driver_fd_deregistered_count() as f64);
192        self.global
193            .io_driver_fd_registered_count()
194            .set(metrics.io_driver_fd_registered_count() as f64);
195        self.global
196            .io_driver_ready_count()
197            .set(metrics.io_driver_ready_count() as f64);
198        self.global
199            .num_blocking_threads()
200            .set(metrics.num_blocking_threads() as f64);
201        self.global
202            .num_idle_blocking_threads()
203            .set(metrics.num_idle_blocking_threads() as f64);
204        self.global.num_workers().set(metrics.num_workers() as f64);
205        self.global
206            .remote_schedule_count()
207            .set(metrics.remote_schedule_count() as f64);
208        self.global
209            .spawned_tasks_count()
210            .set(metrics.spawned_tasks_count() as f64);
211
212        for (worker_idx, worker) in self.workers.iter().enumerate() {
213            worker.update(worker_idx, metrics);
214        }
215    }
216}