Skip to main content

saluki_app/metrics/
mod.rs

1//! Metrics.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use metrics::{gauge, Level};
7use saluki_common::sync::shutdown::ShutdownHandle;
8use saluki_core::{
9    observability::metrics::MetricsFlusherWorker,
10    runtime::{InitializationError, Supervisable, SupervisorFuture},
11};
12use saluki_error::GenericError;
13use saluki_metrics::static_metrics;
14use tokio::{runtime::Handle, select, time::sleep};
15
16mod api;
17pub use self::api::{MetricsAPIHandler, MetricsOverrideWorker};
18
19/// The set of workers spawned by [`initialize_metrics`].
20///
21/// Each worker must be added to a [`Supervisor`][saluki_core::runtime::Supervisor] for the metrics subsystem to
22/// fully function: the flusher worker propagates internal metrics to subscribers, the runtime worker emits Tokio
23/// runtime gauges, and the override processor asserts the privileged API routes and handles dynamic filter
24/// overrides driven through them.
25pub(crate) struct MetricsWorkers {
26    pub runtime: RuntimeMetricsWorker,
27    pub flusher: MetricsFlusherWorker,
28    pub override_processor: MetricsOverrideWorker,
29}
30
31/// Initializes the metrics subsystem for `metrics`.
32///
33/// The given prefix is used to namespace all metrics that are emitted by the application, and is prepended to all
34/// metrics, followed by a period (for example, `<prefix>.<metric name>`). The given default level seeds the runtime
35/// filter and is what the filter is restored to when [`MetricsAPIHandler`]'s reset route is invoked.
36///
37/// Returns a [`MetricsWorkers`] bundle containing the supervisable workers needed to drive the metrics subsystem
38/// at runtime.
39///
40/// # Errors
41///
42/// If the metrics subsystem was already initialized, an error will be returned.
43pub(crate) async fn initialize_metrics(
44    metrics_prefix: impl Into<String>, default_level: Level,
45) -> Result<MetricsWorkers, GenericError> {
46    // We forward to the implementation in `saluki_core` so that we can have this crate be the collection point of all
47    // helpers/types that are specific to generic application setup/initialization.
48    //
49    // The implementation itself has to live in `saluki_core`, however, to have access to all of the underlying types
50    // that are created and used to install the global recorder, such that they need not be exposed publicly.
51    let (filter_handle, flusher) =
52        saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into(), default_level).await?;
53
54    let override_processor = MetricsOverrideWorker::new(filter_handle);
55
56    // Capture the current runtime handle eagerly so the runtime metrics worker measures the runtime that owns
57    // bootstrap, regardless of where the worker future eventually executes under the supervisor.
58    let runtime = RuntimeMetricsWorker::new("primary", Handle::current());
59
60    Ok(MetricsWorkers {
61        runtime,
62        flusher,
63        override_processor,
64    })
65}
66
67/// Emits the startup metrics for the application.
68///
69/// This is generally meant to be called after the application has been initialized, in order to indicate the
70/// application has completed start-up and is now running.
71///
72/// Must be called after the metrics subsystem has been initialized.
73pub fn emit_startup_metrics() {
74    let app_details = saluki_metadata::get_app_details();
75    let app_version = if app_details.is_dev_build() {
76        format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
77    } else {
78        app_details.version().raw().to_string()
79    };
80
81    // Emit a "running" metric to indicate that the application is running.
82    gauge!("running", "version" => app_version).set(1.0);
83}
84
85/// Collects Tokio runtime metrics from the given runtime handle.
86///
87/// All metrics generated will include a `runtime_id` label which maps to the given runtime ID. This allows for
88/// differentiating between multiple runtimes that may be running in the same process.
89pub async fn collect_runtime_metrics(runtime_id: &str, handle: Handle) {
90    // Grab the total number of runtime workers to properly initialize/register our metrics.
91    let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
92
93    // With our metrics registered, enter the main loop where we periodically scrape the metrics.
94    loop {
95        let latest_runtime_metrics = handle.metrics();
96        runtime_metrics.update(&latest_runtime_metrics);
97
98        sleep(Duration::from_secs(5)).await;
99    }
100}
101
102/// A worker that periodically collects Tokio runtime metrics.
103///
104/// The runtime is captured at construction time so that the metrics measured always describe the runtime that
105/// owned the bootstrap, regardless of where this worker eventually executes under a supervisor.
106pub struct RuntimeMetricsWorker {
107    runtime_id: String,
108    handle: Handle,
109}
110
111impl RuntimeMetricsWorker {
112    /// Creates a new `RuntimeMetricsWorker` that collects metrics from the given runtime.
113    pub fn new<S: Into<String>>(runtime_id: S, handle: Handle) -> Self {
114        Self {
115            runtime_id: runtime_id.into(),
116            handle,
117        }
118    }
119}
120
121#[async_trait]
122impl Supervisable for RuntimeMetricsWorker {
123    fn name(&self) -> &str {
124        "tokio-runtime-metrics-collector"
125    }
126
127    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
128        let runtime_id = self.runtime_id.clone();
129        let handle = self.handle.clone();
130
131        Ok(Box::pin(async move {
132            select! {
133                _ = process_shutdown => {},
134                _ = collect_runtime_metrics(&runtime_id, handle) => {},
135            }
136
137            Ok(())
138        }))
139    }
140}
141
142static_metrics!(
143    name => WorkerMetrics,
144    prefix => runtime_worker,
145    labels => [runtime_id: String, worker_idx: String],
146    metrics => [
147        trace_gauge(local_queue_depth),
148        trace_gauge(local_schedule_count),
149        trace_gauge(mean_poll_time),
150        trace_gauge(noop_count),
151        trace_gauge(overflow_count),
152        trace_gauge(park_count),
153        trace_gauge(park_unpark_count),
154        trace_gauge(poll_count),
155        trace_gauge(steal_count),
156        trace_gauge(steal_operations),
157        trace_gauge(total_busy_duration),
158    ]
159);
160
161impl WorkerMetrics {
162    fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
163        Self::new(runtime_id.to_string(), worker_idx.to_string())
164    }
165
166    fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
167        self.local_queue_depth()
168            .set(metrics.worker_local_queue_depth(worker_idx) as f64);
169        self.local_schedule_count()
170            .set(metrics.worker_local_schedule_count(worker_idx) as f64);
171        self.mean_poll_time()
172            .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
173        self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
174        self.overflow_count()
175            .set(metrics.worker_overflow_count(worker_idx) as f64);
176        self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
177        self.park_unpark_count()
178            .set(metrics.worker_park_unpark_count(worker_idx) as f64);
179        self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
180        self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
181        self.steal_operations()
182            .set(metrics.worker_steal_operations(worker_idx) as f64);
183        self.total_busy_duration()
184            .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
185    }
186}
187
188static_metrics!(
189    name => GlobalRuntimeMetrics,
190    prefix => runtime,
191    labels => [runtime_id: String],
192    metrics => [
193        debug_gauge(num_alive_tasks),
194        debug_gauge(blocking_queue_depth),
195        debug_gauge(budget_forced_yield_count),
196        debug_gauge(global_queue_depth),
197        debug_gauge(io_driver_fd_deregistered_count),
198        debug_gauge(io_driver_fd_registered_count),
199        debug_gauge(io_driver_ready_count),
200        debug_gauge(num_blocking_threads),
201        debug_gauge(num_idle_blocking_threads),
202        debug_gauge(num_workers),
203        debug_gauge(remote_schedule_count),
204        debug_gauge(spawned_tasks_count),
205    ],
206);
207
208struct RuntimeMetrics {
209    global: GlobalRuntimeMetrics,
210    workers: Vec<WorkerMetrics>,
211}
212
213impl RuntimeMetrics {
214    fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
215        let mut workers = Vec::with_capacity(workers_len);
216        for i in 0..workers_len {
217            workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
218        }
219
220        Self {
221            global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
222            workers,
223        }
224    }
225
226    fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
227        self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
228        self.global
229            .blocking_queue_depth()
230            .set(metrics.blocking_queue_depth() as f64);
231        self.global
232            .budget_forced_yield_count()
233            .set(metrics.budget_forced_yield_count() as f64);
234        self.global
235            .global_queue_depth()
236            .set(metrics.global_queue_depth() as f64);
237        self.global
238            .io_driver_fd_deregistered_count()
239            .set(metrics.io_driver_fd_deregistered_count() as f64);
240        self.global
241            .io_driver_fd_registered_count()
242            .set(metrics.io_driver_fd_registered_count() as f64);
243        self.global
244            .io_driver_ready_count()
245            .set(metrics.io_driver_ready_count() as f64);
246        self.global
247            .num_blocking_threads()
248            .set(metrics.num_blocking_threads() as f64);
249        self.global
250            .num_idle_blocking_threads()
251            .set(metrics.num_idle_blocking_threads() as f64);
252        self.global.num_workers().set(metrics.num_workers() as f64);
253        self.global
254            .remote_schedule_count()
255            .set(metrics.remote_schedule_count() as f64);
256        self.global
257            .spawned_tasks_count()
258            .set(metrics.spawned_tasks_count() as f64);
259
260        for (worker_idx, worker) in self.workers.iter().enumerate() {
261            worker.update(worker_idx, metrics);
262        }
263    }
264}