Skip to main content

saluki_app/metrics/
mod.rs

1//! Metrics.
2
3use std::time::Duration;
4
5use async_trait::async_trait;
6use metrics::{gauge, Level};
7use saluki_core::{
8    observability::metrics::MetricsFlusherWorker,
9    runtime::{InitializationError, ProcessShutdown, Supervisable, SupervisorFuture},
10};
11use saluki_error::GenericError;
12use saluki_metrics::static_metrics;
13use tokio::{runtime::Handle, select, time::sleep};
14
15mod api;
16pub use self::api::{MetricsAPIHandler, MetricsOverrideWorker};
17
18/// The set of workers spawned by [`initialize_metrics`].
19///
20/// Each worker must be added to a [`Supervisor`][saluki_core::runtime::Supervisor] for the metrics subsystem to
21/// fully function: the flusher worker propagates internal metrics to subscribers, the runtime worker emits Tokio
22/// runtime gauges, and the override processor asserts the privileged API routes and handles dynamic filter
23/// overrides driven through them.
24pub(crate) struct MetricsWorkers {
25    pub runtime: RuntimeMetricsWorker,
26    pub flusher: MetricsFlusherWorker,
27    pub override_processor: MetricsOverrideWorker,
28}
29
30/// Initializes the metrics subsystem for `metrics`.
31///
32/// The given prefix is used to namespace all metrics that are emitted by the application, and is prepended to all
33/// metrics, followed by a period (for example, `<prefix>.<metric name>`). The given default level seeds the runtime
34/// filter and is what the filter is restored to when [`MetricsAPIHandler`]'s reset route is invoked.
35///
36/// Returns a [`MetricsWorkers`] bundle containing the supervisable workers needed to drive the metrics subsystem
37/// at runtime.
38///
39/// # Errors
40///
41/// If the metrics subsystem was already initialized, an error will be returned.
42pub(crate) async fn initialize_metrics(
43    metrics_prefix: impl Into<String>, default_level: Level,
44) -> Result<MetricsWorkers, GenericError> {
45    // We forward to the implementation in `saluki_core` so that we can have this crate be the collection point of all
46    // helpers/types that are specific to generic application setup/initialization.
47    //
48    // The implementation itself has to live in `saluki_core`, however, to have access to all of the underlying types
49    // that are created and used to install the global recorder, such that they need not be exposed publicly.
50    let (filter_handle, flusher) =
51        saluki_core::observability::metrics::initialize_metrics(metrics_prefix.into(), default_level).await?;
52
53    let override_processor = MetricsOverrideWorker::new(filter_handle);
54
55    // Capture the current runtime handle eagerly so the runtime metrics worker measures the runtime that owns
56    // bootstrap, regardless of where the worker future eventually executes under the supervisor.
57    let runtime = RuntimeMetricsWorker::new("primary", Handle::current());
58
59    Ok(MetricsWorkers {
60        runtime,
61        flusher,
62        override_processor,
63    })
64}
65
66/// Emits the startup metrics for the application.
67///
68/// This is generally meant to be called after the application has been initialized, in order to indicate the
69/// application has completed start-up and is now running.
70///
71/// Must be called after the metrics subsystem has been initialized.
72pub fn emit_startup_metrics() {
73    let app_details = saluki_metadata::get_app_details();
74    let app_version = if app_details.is_dev_build() {
75        format!("{}-dev-{}", app_details.version().raw(), app_details.git_hash(),)
76    } else {
77        app_details.version().raw().to_string()
78    };
79
80    // Emit a "running" metric to indicate that the application is running.
81    gauge!("running", "version" => app_version).set(1.0);
82}
83
84/// Collects Tokio runtime metrics from the given runtime handle.
85///
86/// All metrics generated will include a `runtime_id` label which maps to the given runtime ID. This allows for
87/// differentiating between multiple runtimes that may be running in the same process.
88pub async fn collect_runtime_metrics(runtime_id: &str, handle: Handle) {
89    // Grab the total number of runtime workers to properly initialize/register our metrics.
90    let runtime_metrics = RuntimeMetrics::with_workers(runtime_id, handle.metrics().num_workers());
91
92    // With our metrics registered, enter the main loop where we periodically scrape the metrics.
93    loop {
94        let latest_runtime_metrics = handle.metrics();
95        runtime_metrics.update(&latest_runtime_metrics);
96
97        sleep(Duration::from_secs(5)).await;
98    }
99}
100
101/// A worker that periodically collects Tokio runtime metrics.
102///
103/// The runtime is captured at construction time so that the metrics measured always describe the runtime that
104/// owned the bootstrap, regardless of where this worker eventually executes under a supervisor.
105pub struct RuntimeMetricsWorker {
106    runtime_id: String,
107    handle: Handle,
108}
109
110impl RuntimeMetricsWorker {
111    /// Creates a new `RuntimeMetricsWorker` that collects metrics from the given runtime.
112    pub fn new<S: Into<String>>(runtime_id: S, handle: Handle) -> Self {
113        Self {
114            runtime_id: runtime_id.into(),
115            handle,
116        }
117    }
118}
119
120#[async_trait]
121impl Supervisable for RuntimeMetricsWorker {
122    fn name(&self) -> &str {
123        "tokio-runtime-metrics-collector"
124    }
125
126    async fn initialize(&self, mut process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
127        let runtime_id = self.runtime_id.clone();
128        let handle = self.handle.clone();
129
130        Ok(Box::pin(async move {
131            select! {
132                _ = collect_runtime_metrics(&runtime_id, handle) => {},
133                _ = process_shutdown.wait_for_shutdown() => {},
134            }
135            Ok(())
136        }))
137    }
138}
139
140static_metrics!(
141    name => WorkerMetrics,
142    prefix => runtime_worker,
143    labels => [runtime_id: String, worker_idx: String],
144    metrics => [
145        trace_gauge(local_queue_depth),
146        trace_gauge(local_schedule_count),
147        trace_gauge(mean_poll_time),
148        trace_gauge(noop_count),
149        trace_gauge(overflow_count),
150        trace_gauge(park_count),
151        trace_gauge(park_unpark_count),
152        trace_gauge(poll_count),
153        trace_gauge(steal_count),
154        trace_gauge(steal_operations),
155        trace_gauge(total_busy_duration),
156    ]
157);
158
159impl WorkerMetrics {
160    fn with_worker_idx(runtime_id: &str, worker_idx: usize) -> Self {
161        Self::new(runtime_id.to_string(), worker_idx.to_string())
162    }
163
164    fn update(&self, worker_idx: usize, metrics: &tokio::runtime::RuntimeMetrics) {
165        self.local_queue_depth()
166            .set(metrics.worker_local_queue_depth(worker_idx) as f64);
167        self.local_schedule_count()
168            .set(metrics.worker_local_schedule_count(worker_idx) as f64);
169        self.mean_poll_time()
170            .set(metrics.worker_mean_poll_time(worker_idx).as_nanos() as f64);
171        self.noop_count().set(metrics.worker_noop_count(worker_idx) as f64);
172        self.overflow_count()
173            .set(metrics.worker_overflow_count(worker_idx) as f64);
174        self.park_count().set(metrics.worker_park_count(worker_idx) as f64);
175        self.park_unpark_count()
176            .set(metrics.worker_park_unpark_count(worker_idx) as f64);
177        self.poll_count().set(metrics.worker_poll_count(worker_idx) as f64);
178        self.steal_count().set(metrics.worker_steal_count(worker_idx) as f64);
179        self.steal_operations()
180            .set(metrics.worker_steal_operations(worker_idx) as f64);
181        self.total_busy_duration()
182            .set(metrics.worker_total_busy_duration(worker_idx).as_nanos() as f64);
183    }
184}
185
186static_metrics!(
187    name => GlobalRuntimeMetrics,
188    prefix => runtime,
189    labels => [runtime_id: String],
190    metrics => [
191        debug_gauge(num_alive_tasks),
192        debug_gauge(blocking_queue_depth),
193        debug_gauge(budget_forced_yield_count),
194        debug_gauge(global_queue_depth),
195        debug_gauge(io_driver_fd_deregistered_count),
196        debug_gauge(io_driver_fd_registered_count),
197        debug_gauge(io_driver_ready_count),
198        debug_gauge(num_blocking_threads),
199        debug_gauge(num_idle_blocking_threads),
200        debug_gauge(num_workers),
201        debug_gauge(remote_schedule_count),
202        debug_gauge(spawned_tasks_count),
203    ],
204);
205
206struct RuntimeMetrics {
207    global: GlobalRuntimeMetrics,
208    workers: Vec<WorkerMetrics>,
209}
210
211impl RuntimeMetrics {
212    fn with_workers(runtime_id: &str, workers_len: usize) -> Self {
213        let mut workers = Vec::with_capacity(workers_len);
214        for i in 0..workers_len {
215            workers.push(WorkerMetrics::with_worker_idx(runtime_id, i));
216        }
217
218        Self {
219            global: GlobalRuntimeMetrics::new(runtime_id.to_string()),
220            workers,
221        }
222    }
223
224    fn update(&self, metrics: &tokio::runtime::RuntimeMetrics) {
225        self.global.num_alive_tasks().set(metrics.num_alive_tasks() as f64);
226        self.global
227            .blocking_queue_depth()
228            .set(metrics.blocking_queue_depth() as f64);
229        self.global
230            .budget_forced_yield_count()
231            .set(metrics.budget_forced_yield_count() as f64);
232        self.global
233            .global_queue_depth()
234            .set(metrics.global_queue_depth() as f64);
235        self.global
236            .io_driver_fd_deregistered_count()
237            .set(metrics.io_driver_fd_deregistered_count() as f64);
238        self.global
239            .io_driver_fd_registered_count()
240            .set(metrics.io_driver_fd_registered_count() as f64);
241        self.global
242            .io_driver_ready_count()
243            .set(metrics.io_driver_ready_count() as f64);
244        self.global
245            .num_blocking_threads()
246            .set(metrics.num_blocking_threads() as f64);
247        self.global
248            .num_idle_blocking_threads()
249            .set(metrics.num_idle_blocking_threads() as f64);
250        self.global.num_workers().set(metrics.num_workers() as f64);
251        self.global
252            .remote_schedule_count()
253            .set(metrics.remote_schedule_count() as f64);
254        self.global
255            .spawned_tasks_count()
256            .set(metrics.spawned_tasks_count() as f64);
257
258        for (worker_idx, worker) in self.workers.iter().enumerate() {
259            worker.update(worker_idx, metrics);
260        }
261    }
262}