Skip to main content

saluki_app/metrics/
mod.rs

1//! Metrics.
2
3use std::{sync::Mutex, time::Duration};
4
5use async_trait::async_trait;
6use metrics::gauge;
7use saluki_core::{
8    observability::metrics::MetricsFlusherWorker,
9    runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
10};
11use saluki_error::GenericError;
12use saluki_metrics::static_metrics;
13use tokio::{runtime::Handle, select, time::sleep};
14
15static API_HANDLER: Mutex<Option<MetricsAPIHandler>> = Mutex::new(None);
16
17mod api;
18pub use self::api::{MetricsAPIHandler, MetricsOverrideWorker};
19
20/// The set of workers spawned by [`initialize_metrics`].
21///
22/// Each worker must be added to a [`Supervisor`][saluki_core::runtime::Supervisor] for the metrics subsystem to
23/// fully function: the flusher worker propagates internal metrics to subscribers, the runtime worker emits Tokio
24/// runtime gauges, and the override processor handles dynamic filter overrides driven by [`MetricsAPIHandler`].
25pub(crate) struct MetricsWorkers {
26    pub runtime: RuntimeMetricsWorker,
27    pub flusher: MetricsFlusherWorker,
28    pub override_processor: MetricsOverrideWorker,
29}
30
31/// Initializes the metrics subsystem for `metrics`.
32///
33/// The given prefix is used to namespace all metrics that are emitted by the application, and is prepended to all
34/// metrics, followed by a period (for example, `<prefix>.<metric name>`).
35///
36/// Returns a [`MetricsWorkers`] bundle containing the supervisable workers needed to drive the metrics subsystem
37/// at runtime.
38///
39/// # Errors
40///
41/// If the metrics subsystem was already initialized, an error will be returned.
42pub(crate) async fn initialize_metrics(metrics_prefix: impl Into<String>) -> Result<MetricsWorkers, GenericError> {
43    // We forward to the implementation in `saluki_core` so that we can have this crate be the collection point of all
44    // helpers/types that are specific to generic application setup/initialization.
45    //
46    // The implementation itself has to live in `saluki_core`, however, to have access to all of the underlying types
47    // that are created and used to install the global recorder, such that they need not be exposed publicly.
48    let (filter_handle, flusher) =
49        saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into()).await?;
50
51    let (api_handler, override_processor) = MetricsAPIHandler::new(filter_handle);
52    API_HANDLER.lock().unwrap().replace(api_handler);
53
54    // Capture the current runtime handle eagerly so the runtime metrics worker measures the runtime that owns
55    // bootstrap, regardless of where the worker future eventually executes under the supervisor.
56    let runtime = RuntimeMetricsWorker::new("primary", Handle::current());
57
58    Ok(MetricsWorkers {
59        runtime,
60        flusher,
61        override_processor,
62    })
63}
64
65/// Acquires the metrics API handler.
66///
67/// This function is mutable, and consumes the handler if it's present. This means it should only be called once, and
68/// only after metrics have been initialized via `initialize_metrics`.
69///
70/// The metrics API handler can be used to install API routes which allow dynamically controlling the metrics level
71/// filtering. See [`MetricsAPIHandler`] for more information.
72pub fn acquire_metrics_api_handler() -> Option<MetricsAPIHandler> {
73    API_HANDLER.lock().unwrap().take()
74}
75
76/// Emits the startup metrics for the application.
77///
78/// This is generally meant to be called after the application has been initialized, in order to indicate the
79/// application has completed start-up and is now running.
80///
81/// Must be called after the metrics subsystem has been initialized.
82pub fn emit_startup_metrics() {
83    let app_details = saluki_metadata::get_app_details();
84    let app_version = if app_details.is_dev_build() {
85        format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
86    } else {
87        app_details.version().raw().to_string()
88    };
89
90    // Emit a "running" metric to indicate that the application is running.
91    gauge!("running", "version" => app_version).set(1.0);
92}
93
94/// Collects Tokio runtime metrics from the given runtime handle.
95///
96/// All metrics generated will include a `runtime_id` label which maps to the given runtime ID. This allows for
97/// differentiating between multiple runtimes that may be running in the same process.
98pub async fn collect_runtime_metrics(runtime_id: &str, handle: Handle) {
99    // Grab the total number of runtime workers to properly initialize/register our metrics.
100    let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
101
102    // With our metrics registered, enter the main loop where we periodically scrape the metrics.
103    loop {
104        let latest_runtime_metrics = handle.metrics();
105        runtime_metrics.update(&latest_runtime_metrics);
106
107        sleep(Duration::from_secs(5)).await;
108    }
109}
110
111/// A worker that periodically collects Tokio runtime metrics.
112///
113/// The runtime is captured at construction time so that the metrics measured always describe the runtime that
114/// owned the bootstrap, regardless of where this worker eventually executes under a supervisor.
115pub struct RuntimeMetricsWorker {
116    runtime_id: String,
117    handle: Handle,
118}
119
120impl RuntimeMetricsWorker {
121    /// Creates a new `RuntimeMetricsWorker` that collects metrics from the given runtime.
122    pub fn new<S: Into<String>>(runtime_id: S, handle: Handle) -> Self {
123        Self {
124            runtime_id: runtime_id.into(),
125            handle,
126        }
127    }
128}
129
130#[async_trait]
131impl Supervisable for RuntimeMetricsWorker {
132    fn name(&self) -> &str {
133        "tokio-runtime-metrics-collector"
134    }
135
136    async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
137        let runtime_id = self.runtime_id.clone();
138        let handle = self.handle.clone();
139
140        Ok(Box::pin(async move {
141            select! {
142                _ = collect_runtime_metrics(&runtime_id, handle) => {},
143                _ = process_shutdown.wait_for_shutdown() => {},
144            }
145            Ok(())
146        }))
147    }
148}
149
150static_metrics!(
151    name => WorkerMetrics,
152    prefix => runtime_worker,
153    labels => [runtime_id: String, worker_idx: String],
154    metrics => [
155        trace_gauge(local_queue_depth),
156        trace_gauge(local_schedule_count),
157        trace_gauge(mean_poll_time),
158        trace_gauge(noop_count),
159        trace_gauge(overflow_count),
160        trace_gauge(park_count),
161        trace_gauge(park_unpark_count),
162        trace_gauge(poll_count),
163        trace_gauge(steal_count),
164        trace_gauge(steal_operations),
165        trace_gauge(total_busy_duration),
166    ]
167);
168
169impl WorkerMetrics {
170    fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
171        Self::new(runtime_id.to_string(), worker_idx.to_string())
172    }
173
174    fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
175        self.local_queue_depth()
176            .set(metrics.worker_local_queue_depth(worker_idx) as f64);
177        self.local_schedule_count()
178            .set(metrics.worker_local_schedule_count(worker_idx) as f64);
179        self.mean_poll_time()
180            .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
181        self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
182        self.overflow_count()
183            .set(metrics.worker_overflow_count(worker_idx) as f64);
184        self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
185        self.park_unpark_count()
186            .set(metrics.worker_park_unpark_count(worker_idx) as f64);
187        self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
188        self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
189        self.steal_operations()
190            .set(metrics.worker_steal_operations(worker_idx) as f64);
191        self.total_busy_duration()
192            .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
193    }
194}
195
196static_metrics!(
197    name => GlobalRuntimeMetrics,
198    prefix => runtime,
199    labels => [runtime_id: String],
200    metrics => [
201        gauge(num_alive_tasks),
202        gauge(blocking_queue_depth),
203        gauge(budget_forced_yield_count),
204        gauge(global_queue_depth),
205        gauge(io_driver_fd_deregistered_count),
206        gauge(io_driver_fd_registered_count),
207        gauge(io_driver_ready_count),
208        gauge(num_blocking_threads),
209        gauge(num_idle_blocking_threads),
210        gauge(num_workers),
211        gauge(remote_schedule_count),
212        gauge(spawned_tasks_count),
213    ],
214);
215
216struct RuntimeMetrics {
217    global: GlobalRuntimeMetrics,
218    workers: Vec<WorkerMetrics>,
219}
220
221impl RuntimeMetrics {
222    fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
223        let mut workers = Vec::with_capacity(workers_len);
224        for i in 0..workers_len {
225            workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
226        }
227
228        Self {
229            global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
230            workers,
231        }
232    }
233
234    fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
235        self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
236        self.global
237            .blocking_queue_depth()
238            .set(metrics.blocking_queue_depth() as f64);
239        self.global
240            .budget_forced_yield_count()
241            .set(metrics.budget_forced_yield_count() as f64);
242        self.global
243            .global_queue_depth()
244            .set(metrics.global_queue_depth() as f64);
245        self.global
246            .io_driver_fd_deregistered_count()
247            .set(metrics.io_driver_fd_deregistered_count() as f64);
248        self.global
249            .io_driver_fd_registered_count()
250            .set(metrics.io_driver_fd_registered_count() as f64);
251        self.global
252            .io_driver_ready_count()
253            .set(metrics.io_driver_ready_count() as f64);
254        self.global
255            .num_blocking_threads()
256            .set(metrics.num_blocking_threads() as f64);
257        self.global
258            .num_idle_blocking_threads()
259            .set(metrics.num_idle_blocking_threads() as f64);
260        self.global.num_workers().set(metrics.num_workers() as f64);
261        self.global
262            .remote_schedule_count()
263            .set(metrics.remote_schedule_count() as f64);
264        self.global
265            .spawned_tasks_count()
266            .set(metrics.spawned_tasks_count() as f64);
267
268        for (worker_idx, worker) in self.workers.iter().enumerate() {
269            worker.update(worker_idx, metrics);
270        }
271    }
272}