Skip to main content

saluki_core/health/
mod.rs

1//! Health registry for tracking component readiness and liveness.
2
3use std::future::Future;
4#[cfg(test)]
5use std::sync::atomic::AtomicUsize;
6use std::{
7    collections::HashSet,
8    sync::{
9        atomic::{AtomicBool, Ordering::Relaxed},
10        Arc, Mutex,
11    },
12    time::Duration,
13};
14
15use futures::StreamExt as _;
16use saluki_error::{generic_error, GenericError};
17use saluki_metrics::static_metrics;
18use stringtheory::MetaString;
19use tokio::{pin, time::Instant};
20use tokio::{
21    select,
22    sync::{
23        mpsc::{self, error::TrySendError},
24        Notify,
25    },
26};
27use tokio_util::time::{delay_queue::Key, DelayQueue};
28use tracing::{debug, info, trace};
29
30mod api;
31pub use self::api::HealthAPIHandler;
32
33mod worker;
34pub use self::worker::HealthRegistryWorker;
35
36const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
37const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
38
39/// A handle for updating the health of a component.
40pub struct Health {
41    shared: Arc<SharedComponentState>,
42    request_rx: mpsc::Receiver<LivenessRequest>,
43    response_tx: mpsc::Sender<LivenessResponse>,
44    readiness_notify: Arc<Notify>,
45}
46
47impl Health {
48    /// Marks the component as ready.
49    pub fn mark_ready(&mut self) {
50        self.update_readiness(true);
51    }
52
53    /// Marks the component as not ready.
54    pub fn mark_not_ready(&mut self) {
55        self.update_readiness(false);
56    }
57
58    fn update_readiness(&self, ready: bool) {
59        self.shared.ready.store(ready, Relaxed);
60        self.shared.telemetry.update_readiness(ready);
61
62        // Wake any tasks waiting in `HealthRegistry::all_ready` so they can re-check whether all components are ready.
63        if ready {
64            self.readiness_notify.notify_waiters();
65        }
66    }
67
68    /// Waits for a liveness probe to be sent to the component, and then responds to it.
69    ///
70    /// This should generally be polled as part of a `select!` block to ensure it's checked alongside other
71    /// asynchronous operations.
72    pub async fn live(&mut self) {
73        // Simply wait for the health registry to send us a liveness probe, and if we receive one, we respond back to it
74        // immediately.
75        if let Some(request) = self.request_rx.recv().await {
76            let response = request.into_response();
77            let _ = self.response_tx.send(response).await;
78        }
79    }
80}
81
82#[derive(Clone, Copy, Eq, PartialEq)]
83enum HealthState {
84    Live,
85    Unknown,
86    Dead,
87}
88
89static_metrics!(
90    name => Telemetry,
91    prefix => health,
92    labels => [component_id: Arc<str>],
93    metrics => [
94        gauge(component_ready),
95        gauge(component_live),
96        trace_histogram(component_liveness_latency_seconds),
97    ]
98);
99
100impl Telemetry {
101    fn from_name(name: &str) -> Self {
102        Self::new(Arc::from(name))
103    }
104
105    fn update_readiness(&self, ready: bool) {
106        self.component_ready().set(if ready { 1.0 } else { 0.0 });
107    }
108
109    fn update_liveness(&self, state: HealthState, response_latency: Duration) {
110        let live = match state {
111            HealthState::Live => 1.0,
112            HealthState::Unknown => 0.0,
113            HealthState::Dead => -1.0,
114        };
115
116        self.component_live().set(live);
117        self.component_liveness_latency_seconds()
118            .record(response_latency.as_secs_f64());
119    }
120}
121
122struct SharedComponentState {
123    ready: AtomicBool,
124    telemetry: Telemetry,
125}
126
127struct ComponentState {
128    name: MetaString,
129    health: HealthState,
130    shared: Arc<SharedComponentState>,
131    request_tx: mpsc::Sender<LivenessRequest>,
132    last_response: Instant,
133    last_response_latency: Duration,
134}
135
136impl ComponentState {
137    fn new(
138        name: MetaString, response_tx: mpsc::Sender<LivenessResponse>, readiness_notify: Arc<Notify>,
139    ) -> (Self, Health) {
140        let shared = Arc::new(SharedComponentState {
141            ready: AtomicBool::new(false),
142            telemetry: Telemetry::from_name(&name),
143        });
144        let (request_tx, request_rx) = mpsc::channel(1);
145
146        let state = Self {
147            name,
148            health: HealthState::Unknown,
149            shared: Arc::clone(&shared),
150            request_tx,
151            last_response: Instant::now(),
152            last_response_latency: Duration::from_secs(0),
153        };
154
155        let handle = Health {
156            shared,
157            request_rx,
158            response_tx,
159            readiness_notify,
160        };
161
162        (state, handle)
163    }
164
165    fn is_ready(&self) -> bool {
166        // We consider a component ready if it's marked as ready (duh) and it's not dead.
167        //
168        // Being "dead" is a special case as it means the component is very likely not even running at all, not just
169        // responding slowly or deadlocked. In these cases, it can't possibly be ready since it's not even running.
170        self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
171    }
172
173    fn is_live(&self) -> bool {
174        self.health == HealthState::Live
175    }
176
177    fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
178        self.health = HealthState::Live;
179        self.last_response = response_sent;
180        self.last_response_latency = response_latency;
181        self.shared.telemetry.update_liveness(self.health, response_latency);
182    }
183
184    fn mark_not_live(&mut self) {
185        self.health = HealthState::Unknown;
186
187        // We use the default timeout as the latency for when the component is not considered alive.
188        self.shared
189            .telemetry
190            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
191    }
192
193    fn mark_dead(&mut self) {
194        self.health = HealthState::Dead;
195
196        // We use the default timeout as the latency for when the component is not considered alive.
197        self.shared
198            .telemetry
199            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
200    }
201}
202
203struct LivenessRequest {
204    component_id: usize,
205    timeout_key: Key,
206    request_sent: Instant,
207}
208
209impl LivenessRequest {
210    fn new(component_id: usize, timeout_key: Key) -> Self {
211        Self {
212            component_id,
213            timeout_key,
214            request_sent: Instant::now(),
215        }
216    }
217
218    fn into_response(self) -> LivenessResponse {
219        LivenessResponse {
220            request: self,
221            response_sent: Instant::now(),
222        }
223    }
224}
225
226struct LivenessResponse {
227    request: LivenessRequest,
228    response_sent: Instant,
229}
230
231enum HealthUpdate {
232    Alive {
233        last_response: Instant,
234        last_response_latency: Duration,
235    },
236    Unknown,
237    Dead,
238}
239
240impl HealthUpdate {
241    fn as_str(&self) -> &'static str {
242        match self {
243            HealthUpdate::Alive { .. } => "alive",
244            HealthUpdate::Unknown => "unknown",
245            HealthUpdate::Dead => "dead",
246        }
247    }
248}
249
250struct RegistryState {
251    registered_components: HashSet<MetaString>,
252    component_state: Vec<ComponentState>,
253    responses_tx: mpsc::Sender<LivenessResponse>,
254    responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
255    pending_components: Vec<usize>,
256    pending_components_notify: Arc<Notify>,
257    readiness_notify: Arc<Notify>,
258}
259
260impl RegistryState {
261    fn new() -> Self {
262        let (responses_tx, responses_rx) = mpsc::channel(16);
263
264        Self {
265            registered_components: HashSet::new(),
266            component_state: Vec::new(),
267            responses_tx,
268            responses_rx: Some(responses_rx),
269            pending_components: Vec::new(),
270            pending_components_notify: Arc::new(Notify::new()),
271            readiness_notify: Arc::new(Notify::new()),
272        }
273    }
274}
275
276/// A registry of components and their health.
277///
278/// `HealthRegistry` is responsible for tracking the health of all registered components, by storing both their
279/// readiness, which indicates whether or not they're initialized and generally ready to process data, as well as
280/// probing their liveness, which indicates if they're currently responding, or able to respond, to requests.
281///
282/// # Telemetry
283///
284/// The health registry emits some internal telemetry about the status of registered components. In particular, three
285/// metrics are emitted:
286///
287/// - `health.component_ready`: whether or not a component is ready (`gauge`, `0` for not ready, `1` for ready)
288/// - `health.component_alive`: whether or not a component is alive (`gauge`, `0` for not alive/unknown, `1` for alive, `-1` for dead)
289/// - `health.component_liveness_latency_secs`: the response latency of the component for liveness probes (`histogram`,
290///   in seconds)
291///
292/// All metrics have a `component_id` tag that corresponds to the name of the component that was given when registering it.
293#[derive(Clone)]
294pub struct HealthRegistry {
295    inner: Arc<Mutex<RegistryState>>,
296}
297
298impl HealthRegistry {
299    /// Creates an empty registry.
300    pub fn new() -> Self {
301        Self {
302            inner: Arc::new(Mutex::new(RegistryState::new())),
303        }
304    }
305
306    #[cfg(test)]
307    fn state(&self) -> Arc<Mutex<RegistryState>> {
308        Arc::clone(&self.inner)
309    }
310
311    /// Registers a component with the registry.
312    ///
313    /// A handle is returned that must be used by the component to set its readiness as well as respond to liveness
314    /// probes. See [`Health::mark_ready`], [`Health::mark_not_ready`], and [`Health::live`] for more information.
315    pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
316        let mut inner = self.inner.lock().unwrap();
317
318        // Make sure we don't already have this component registered.
319        let name = name.into();
320        if !inner.registered_components.insert(name.clone()) {
321            return None;
322        }
323
324        // Add the component state.
325        let readiness_notify = Arc::clone(&inner.readiness_notify);
326        let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone(), readiness_notify);
327        let component_id = inner.component_state.len();
328        inner.component_state.push(state);
329
330        debug!(component_id, "Registered component '{}'.", name);
331
332        // Mark ourselves as having a pending component that needs to be scheduled.
333        inner.pending_components.push(component_id);
334        inner.pending_components_notify.notify_one();
335
336        Some(handle)
337    }
338
339    /// Gets an API handler for reporting the health of all components.
340    ///
341    /// This handler exposes routes for querying the readiness and liveness of all registered components. See
342    /// [`HealthAPIHandler`] for more information about routes and responses.
343    pub fn api_handler(&self) -> HealthAPIHandler {
344        HealthAPIHandler::from_state(Arc::clone(&self.inner))
345    }
346
347    /// Waits until all registered components are ready.
348    ///
349    /// If no components are registered, or all currently registered components are ready, the method returns immediately. Otherwise,
350    /// the method will return as soon as all registered components transition to ready.
351    ///
352    /// Note that components can be registered while this method is waiting, which will influence how long this method
353    /// takes to return. Callers should ensure that all components have been registered before calling this method.
354    pub async fn all_ready(&self) {
355        self.all_ready_matching(|_| true).await
356    }
357
358    /// Waits until all currently registered components whose name matches `predicate` are ready.
359    ///
360    /// This is a scoped variant of [`all_ready`][Self::all_ready] that only considers components whose name satisfies
361    /// the given predicate, which is useful for waiting on a specific subsystem's components to become ready without
362    /// waiting on every other component in the registry.
363    ///
364    /// If no registered component matches the predicate, the method returns immediately. As with
365    /// [`all_ready`][Self::all_ready], components can be registered while this method is waiting, so callers should
366    /// ensure all components they care about have been registered before calling this method.
367    pub async fn all_ready_matching<F>(&self, predicate: F)
368    where
369        F: Fn(&str) -> bool,
370    {
371        let readiness_notify = {
372            let inner = self.inner.lock().unwrap();
373            Arc::clone(&inner.readiness_notify)
374        };
375
376        loop {
377            // Register as a waiter _before_ checking to avoid missing notifications during the check.
378            let notified = readiness_notify.notified();
379
380            if self.check_ready_matching(&predicate) {
381                return;
382            }
383
384            notified.await;
385        }
386    }
387
388    fn check_ready_matching<F>(&self, predicate: &F) -> bool
389    where
390        F: Fn(&str) -> bool,
391    {
392        let inner = self.inner.lock().unwrap();
393        inner
394            .component_state
395            .iter()
396            .filter(|component| predicate(&component.name))
397            .all(|component| component.is_ready())
398    }
399
400    /// Creates a [`HealthRegistryWorker`] that can be added to a supervisor to run the health registry.
401    ///
402    /// The worker handles the lifecycle of the health registry runner, including registering the health API routes
403    /// dynamically and running the liveness probing event loop.
404    pub fn worker(&self) -> HealthRegistryWorker {
405        HealthRegistryWorker::new(self.clone())
406    }
407
408    pub(crate) fn into_runner(self) -> Result<Runner, GenericError> {
409        // Make sure the runner hasn't already been spawned.
410        let (responses_rx, pending_components_notify) = {
411            let mut inner = self.inner.lock().unwrap();
412            let responses_rx = match inner.responses_rx.take() {
413                Some(rx) => rx,
414                None => return Err(generic_error!("health registry already spawned")),
415            };
416
417            let pending_components_notify = Arc::clone(&inner.pending_components_notify);
418            (responses_rx, pending_components_notify)
419        };
420
421        Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
422    }
423}
424
425/// A guard that returns the response receiver back to the registry when dropped.
426///
427/// This allows the health registry runner to be restarted gracefully: whenever the runner task
428/// finishes and this guard is dropped (for example, after a shutdown or task cancellation), the
429/// receiver is returned to the registry state so that a subsequent call to `spawn()` can succeed.
430struct RunnerGuard {
431    registry: Arc<Mutex<RegistryState>>,
432    responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
433}
434
435impl Drop for RunnerGuard {
436    fn drop(&mut self) {
437        if let Some(rx) = self.responses_rx.take() {
438            let mut inner = self.registry.lock().expect("registry state poisoned");
439            inner.responses_rx = Some(rx);
440            debug!("Returned response receiver to registry state.");
441        }
442    }
443}
444
445#[cfg(test)]
446struct RunnerState {
447    pending_scheduled_probes: AtomicUsize,
448    pending_probe_timeouts: AtomicUsize,
449}
450
451#[cfg(test)]
452impl RunnerState {
453    fn new() -> Self {
454        Self {
455            pending_scheduled_probes: AtomicUsize::new(0),
456            pending_probe_timeouts: AtomicUsize::new(0),
457        }
458    }
459
460    fn pending_scheduled_probes(&self) -> usize {
461        self.pending_scheduled_probes.load(Relaxed)
462    }
463
464    fn pending_probe_timeouts(&self) -> usize {
465        self.pending_probe_timeouts.load(Relaxed)
466    }
467
468    fn increment_pending_scheduled_probes(&self) {
469        self.pending_scheduled_probes.fetch_add(1, Relaxed);
470    }
471
472    fn increment_pending_probe_timeouts(&self) {
473        self.pending_probe_timeouts.fetch_add(1, Relaxed);
474    }
475
476    fn decrement_pending_scheduled_probes(&self) {
477        self.pending_scheduled_probes.fetch_sub(1, Relaxed);
478    }
479
480    fn decrement_pending_probe_timeouts(&self) {
481        self.pending_probe_timeouts.fetch_sub(1, Relaxed);
482    }
483}
484
485pub(super) struct Runner {
486    registry: Arc<Mutex<RegistryState>>,
487    pending_probes: DelayQueue<usize>,
488    pending_timeouts: DelayQueue<usize>,
489    guard: RunnerGuard,
490    pending_components_notify: Arc<Notify>,
491    #[cfg(test)]
492    state: Arc<RunnerState>,
493}
494
495impl Runner {
496    fn new(
497        registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
498        pending_components_notify: Arc<Notify>,
499    ) -> Self {
500        #[cfg(test)]
501        let state = Arc::new(RunnerState::new());
502
503        let guard = RunnerGuard {
504            registry: Arc::clone(&registry),
505            responses_rx: Some(responses_rx),
506        };
507
508        Self {
509            registry,
510            pending_probes: DelayQueue::new(),
511            pending_timeouts: DelayQueue::new(),
512            guard,
513            pending_components_notify,
514            #[cfg(test)]
515            state,
516        }
517    }
518
519    #[cfg(test)]
520    fn state(&self) -> Arc<RunnerState> {
521        Arc::clone(&self.state)
522    }
523
524    fn drain_pending_components(&mut self) -> Vec<usize> {
525        // Drain all pending components.
526        let mut registry = self.registry.lock().unwrap();
527        registry.pending_components.drain(..).collect()
528    }
529
530    fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
531        let mut registry = self.registry.lock().unwrap();
532        let component_state = &mut registry.component_state[component_id];
533
534        // Check if our component is already dead, in which case we don't need to send a liveness probe.
535        if component_state.request_tx.is_closed() {
536            debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
537            return Some(HealthUpdate::Dead);
538        }
539
540        trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
541
542        // Our component _isn't_ dead, so try to send a liveness probe to it.
543        //
544        // We'll register an entry in `pending_timeouts` that automatically marks the component as not live if we don't
545        // receive a response to the liveness probe within the timeout duration.
546        let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
547
548        #[cfg(test)]
549        self.state.increment_pending_probe_timeouts();
550
551        let request = LivenessRequest::new(component_id, timeout_key);
552        if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
553            debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
554
555            // We failed to send the probe to the component due to the component being dead. We'll drop our pending
556            // timeout as we're going to mark this component dead right now.
557            //
558            // When our send fails due to the channel being full, that's OK: it means it's going to be handled by an
559            // existing timeout and will be probed again later.
560            self.pending_timeouts.remove(&request.timeout_key);
561
562            #[cfg(test)]
563            self.state.decrement_pending_probe_timeouts();
564
565            return Some(HealthUpdate::Dead);
566        }
567
568        None
569    }
570
571    fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
572        #[cfg(test)]
573        self.state.increment_pending_scheduled_probes();
574
575        self.pending_probes.insert(component_id, duration);
576    }
577
578    fn schedule_all_existing_components(&mut self, responses_rx: &mut mpsc::Receiver<LivenessResponse>) {
579        // First, drain any pending components to avoid scheduling them twice.
580        // This handles the case where components were registered before the runner started.
581        let _pending = self.drain_pending_components();
582
583        // Drain any queued probe responses from the previous runner. These responses were sent by
584        // components before the runner shut down but weren't processed. Processing them now updates
585        // `last_response` timestamps, which affects the staleness check below — a response that
586        // arrived just before shutdown should count as fresh.
587        while let Ok(response) = responses_rx.try_recv() {
588            self.handle_component_probe_response(response);
589        }
590
591        // Determine which components have stale probe results. Components whose last response is
592        // within the probe timeout are considered fresh and their health state is preserved,
593        // avoiding unnecessary bursts of failed liveness/readiness probes on runner restart.
594        let (component_count, stale_component_ids) = {
595            let registry = self.registry.lock().unwrap();
596            let now = Instant::now();
597            let stale_ids: Vec<usize> = (0..registry.component_state.len())
598                .filter(|&id| {
599                    now.duration_since(registry.component_state[id].last_response) >= DEFAULT_PROBE_TIMEOUT_DUR
600                })
601                .collect();
602            (registry.component_state.len(), stale_ids)
603        };
604
605        // Only reset health to Unknown for components with stale probe results.
606        for &component_id in &stale_component_ids {
607            self.process_component_health_update(component_id, HealthUpdate::Unknown);
608        }
609
610        // Schedule immediate probes for all components regardless of staleness.
611        for component_id in 0..component_count {
612            self.schedule_probe_for_component(component_id, Duration::ZERO);
613        }
614
615        if component_count > 0 {
616            let fresh_count = component_count - stale_component_ids.len();
617            debug!(
618                component_count,
619                fresh_count,
620                stale_count = stale_component_ids.len(),
621                "Scheduled probes for all existing components."
622            );
623        }
624    }
625
626    fn handle_component_probe_response(&mut self, response: LivenessResponse) {
627        let component_id = response.request.component_id;
628        let timeout_key = response.request.timeout_key;
629        let request_sent = response.request.request_sent;
630        let response_sent = response.response_sent;
631        let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
632
633        // Clear any pending timeouts for this component and schedule the next probe.
634        let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
635        if !timeout_was_pending {
636            let mut registry = self.registry.lock().unwrap();
637            let component_state = &mut registry.component_state[component_id];
638
639            debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
640        }
641
642        // Update the component's health to show as alive.
643        let update = HealthUpdate::Alive {
644            last_response: response_sent,
645            last_response_latency: response_latency,
646        };
647        self.process_component_health_update(component_id, update);
648
649        // Only schedule the next probe if we successfully removed the timeout, meaning it hadn't fired yet.
650        // This prevents duplicate probe scheduling when a response arrives after a timeout.
651        if timeout_was_pending {
652            #[cfg(test)]
653            self.state.decrement_pending_probe_timeouts();
654
655            self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
656        }
657    }
658
659    fn handle_component_timeout(&mut self, component_id: usize) {
660        // Update the component's health to show as not alive.
661        self.process_component_health_update(component_id, HealthUpdate::Unknown);
662
663        // Schedule the next probe for this component.
664        self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
665    }
666
667    fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
668        // Update the component's health state based on the given update.
669        let mut registry = self.registry.lock().unwrap();
670        let component_state = &mut registry.component_state[component_id];
671        trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
672
673        match update {
674            HealthUpdate::Alive {
675                last_response,
676                last_response_latency,
677            } => component_state.mark_live(last_response, last_response_latency),
678            HealthUpdate::Unknown => component_state.mark_not_live(),
679            HealthUpdate::Dead => component_state.mark_dead(),
680        }
681    }
682
683    async fn run<F: Future<Output = ()>>(mut self, shutdown: F) {
684        info!("Health checker running.");
685
686        // Take the response receiver out of the guard so we can use it in the select loop.
687        // It will be put back when the guard is dropped.
688        let mut responses_rx = self
689            .guard
690            .responses_rx
691            .take()
692            .expect("responses_rx should always be Some when Runner is created");
693
694        // Schedule probes for all existing components. This allows the runner to "pick up where it
695        // left off" after a restart - any components that were registered before the runner was
696        // restarted will be immediately probed.
697        self.schedule_all_existing_components(&mut responses_rx);
698
699        // Pin the shutdown future so we can poll it in the select loop.
700        pin!(shutdown);
701
702        loop {
703            select! {
704                // Shutdown signal received - exit the run loop gracefully.
705                _ = &mut shutdown => {
706                    info!("Health checker shutting down.");
707                    break;
708                },
709
710                // A component has been scheduled to have a liveness probe sent to it.
711                Some(entry) = self.pending_probes.next() => {
712                    #[cfg(test)]
713                    self.state.decrement_pending_scheduled_probes();
714
715                    let component_id = entry.into_inner();
716                    if let Some(health_update) = self.send_component_probe_request(component_id) {
717                        // If we got a health update for this component, that means we detected that it's dead, so we need
718                        // to do an out-of-band update to its health.
719                        self.process_component_health_update(component_id, health_update);
720                    }
721                },
722
723                // A component's outstanding liveness probe has expired.
724                Some(entry) = self.pending_timeouts.next() => {
725                    #[cfg(test)]
726                    self.state.decrement_pending_probe_timeouts();
727
728                    let component_id = entry.into_inner();
729                    self.handle_component_timeout(component_id);
730                },
731
732                // A probe response has been received.
733                Some(response) = responses_rx.recv() => {
734                    self.handle_component_probe_response(response);
735                },
736
737                // A component is pending finalization of their registration.
738                _ = self.pending_components_notify.notified() => {
739                    // Drain all pending components, give them a clean initial state of "unknown", and immediately schedule a probe for them.
740                    let pending_component_ids = self.drain_pending_components();
741                    for pending_component_id in pending_component_ids {
742                        self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
743                        self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
744                    }
745                },
746            }
747        }
748
749        // Put the receiver back in the guard so it can be returned to the registry state when dropped.
750        self.guard.responses_rx = Some(responses_rx);
751
752        // When we exit the loop, the RunnerGuard will be dropped, returning the response receiver
753        // back to the registry state so that a subsequent spawn() can succeed.
754    }
755}
756
757#[cfg(test)]
758mod tests {
759    use std::future::Future;
760
761    use futures::FutureExt as _;
762    use tokio::sync::oneshot;
763    use tokio_test::{
764        assert_pending, assert_ready,
765        task::{spawn, Spawn},
766    };
767
768    use super::*;
769
770    const COMPONENT_ID: &str = "test_component";
771
772    #[track_caller]
773    fn initialize_registry_with_component(
774        component_id: &str,
775    ) -> (
776        Health,
777        Spawn<impl Future<Output = ()>>,
778        Arc<Mutex<RegistryState>>,
779        Arc<RunnerState>,
780    ) {
781        let registry = HealthRegistry::new();
782        let registry_state = registry.state();
783
784        // Add our component to the registry:
785        let handle = registry.register_component(component_id).unwrap();
786
787        // Extract the registry runner task and poll it until it's quiesced.
788        //
789        // This ensures that the component is registered, and that it schedules/sends an initial probe request to the component:
790        let runner = registry.into_runner().expect("should not fail to create runner");
791        let runner_state = runner.state();
792
793        // Create a shutdown future that never resolves (for tests that don't need shutdown).
794        let shutdown = std::future::pending();
795        let registry_task = spawn(runner.run(shutdown));
796
797        (handle, registry_task, registry_state, runner_state)
798    }
799
800    #[track_caller]
801    fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
802        assert_pending!(task.poll());
803        while task.is_woken() {
804            assert_pending!(task.poll());
805        }
806    }
807
808    fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
809        let state = state.lock().unwrap();
810        state
811            .component_state
812            .iter()
813            .find(|state| state.name == component_id)
814            .map(|state| state.is_live())
815            .unwrap()
816    }
817
818    #[test]
819    fn basic_registration() {
820        let registry = HealthRegistry::new();
821        assert!(registry.register_component(COMPONENT_ID).is_some());
822    }
823
824    #[test]
825    fn duplicate_component_registration_fails() {
826        let registry = HealthRegistry::new();
827
828        // Registering the same component twice should fail:
829        assert!(registry.register_component(COMPONENT_ID).is_some());
830        assert!(registry.register_component(COMPONENT_ID).is_none());
831    }
832
833    #[test]
834    fn duplicate_runner_creation_fails_while_running() {
835        let registry = HealthRegistry::new();
836        let registry2 = registry.clone();
837
838        // First runner creation should succeed. We hold on to it so the RunnerGuard doesn't
839        // return the receiver back to the registry state.
840        let _runner = registry.into_runner().expect("first runner creation should succeed");
841
842        // Second creation should fail while the first runner still holds the receiver.
843        assert!(registry2.into_runner().is_err());
844    }
845
846    #[tokio::test]
847    async fn registry_can_be_respawned_after_shutdown() {
848        let registry = HealthRegistry::new();
849        let registry2 = registry.clone();
850        let registry3 = registry.clone();
851
852        // First runner creation should succeed.
853        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
854        let runner = registry.into_runner().expect("first runner creation should succeed");
855
856        // Run the runner on a spawned task so we can trigger shutdown.
857        let join_handle = tokio::spawn(runner.run(shutdown_rx.map(|_| ())));
858
859        // Trigger shutdown.
860        let _ = shutdown_tx.send(());
861
862        // Wait for the runner to stop.
863        join_handle.await.expect("runner should complete without panic");
864
865        // Now we should be able to create a runner again (the RunnerGuard returned the receiver).
866        let _runner2 = registry2
867            .into_runner()
868            .expect("should be able to create runner after shutdown");
869
870        // But not a third time while the second runner holds the receiver.
871        assert!(
872            registry3.into_runner().is_err(),
873            "should not be able to create runner while one exists"
874        );
875    }
876
877    #[test]
878    fn readiness() {
879        let registry = HealthRegistry::new();
880
881        // An empty registry is always ready, so `all_ready` resolves immediately:
882        let mut all_ready_fut = spawn(registry.all_ready());
883        assert_ready!(all_ready_fut.poll());
884
885        // Components start out as not ready, so adding this component changes the registry to not ready overall:
886        let mut handle = registry.register_component(COMPONENT_ID).unwrap();
887
888        let mut all_ready_fut = spawn(registry.all_ready());
889        assert_pending!(all_ready_fut.poll());
890
891        // Now mark the component as ready. `all_ready` should resolve on the next poll:
892        handle.mark_ready();
893
894        assert!(all_ready_fut.is_woken());
895        assert_ready!(all_ready_fut.poll());
896
897        // Ensure a fresh `all_ready` call immediately observes all components being ready:
898        let mut all_ready_fut = spawn(registry.all_ready());
899        assert_ready!(all_ready_fut.poll());
900
901        // Finally, make sure that the readiness state isn't latched, as `all_ready` should always reflect the current state:
902        handle.mark_not_ready();
903
904        let mut all_ready_fut = spawn(registry.all_ready());
905        assert_pending!(all_ready_fut.poll());
906    }
907
908    #[tokio::test(start_paused = true)]
909    async fn component_responds_before_timeout() {
910        // Create our registry with a registered component:
911        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
912
913        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
914        // which means the component hasn't been registered yet and no probe request has been sent:
915        let mut live_future = spawn(handle.live());
916        assert_pending!(live_future.poll());
917        assert_eq!(runner_state.pending_probe_timeouts(), 0);
918        assert_eq!(runner_state.pending_scheduled_probes(), 0);
919
920        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
921        drive_until_quiesced(&mut registry);
922        assert_eq!(runner_state.pending_probe_timeouts(), 1);
923        assert_eq!(runner_state.pending_scheduled_probes(), 0);
924
925        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
926        assert!(!component_live(&registry_state, COMPONENT_ID));
927
928        // After polling the registry task, we should have sent a probe request which will have now woken up our `live` future.
929        //
930        // Poll the future which should then respond to the probe request:
931        assert!(live_future.is_woken());
932        assert_ready!(live_future.poll());
933
934        // The registry task should have been woken by the probe response.
935        //
936        // Drive the registry task until it is quiesced and ensure that the component is now live:
937        assert!(registry.is_woken());
938        drive_until_quiesced(&mut registry);
939
940        assert!(component_live(&registry_state, COMPONENT_ID));
941
942        // Since the probe response was received, we should have a pending schedule probe now since this is a "normal" probe now,
943        // and isn't the initial probe request which is scheduled immediately:
944        assert_eq!(runner_state.pending_probe_timeouts(), 0);
945        assert_eq!(runner_state.pending_scheduled_probes(), 1);
946    }
947
948    #[tokio::test(start_paused = true)]
949    async fn component_responds_after_timeout() {
950        // Create our registry with a registered component:
951        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
952
953        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
954        // which means the component hasn't been registered yet and no probe request has been sent:
955        let mut live_future = spawn(handle.live());
956        assert_pending!(live_future.poll());
957        assert_eq!(runner_state.pending_probe_timeouts(), 0);
958        assert_eq!(runner_state.pending_scheduled_probes(), 0);
959
960        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
961        drive_until_quiesced(&mut registry);
962        assert_eq!(runner_state.pending_probe_timeouts(), 1);
963        assert_eq!(runner_state.pending_scheduled_probes(), 0);
964
965        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
966        assert!(!component_live(&registry_state, COMPONENT_ID));
967
968        // After polling the registry task, we should have sent a probe request which will have now woken up
969        // our `live` future, but we won't yet poll it. In fact, we'll advance time _past_ the probe timeout to simulate
970        // the probe timeout expiring:
971        assert!(live_future.is_woken());
972        assert!(!registry.is_woken());
973
974        tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
975
976        // The registry task should have been woken by the probe timeout expiring.
977        //
978        // Drive the registry task until it is quiesced and ensure that the component is still not live:
979        assert!(registry.is_woken());
980        drive_until_quiesced(&mut registry);
981
982        assert!(!component_live(&registry_state, COMPONENT_ID));
983
984        // Since the probe response was not received, we should have a pending schedule probe now since this is a "normal" probe now,
985        // and isn't the initial probe request which is scheduled immediately:
986        assert_eq!(runner_state.pending_probe_timeouts(), 0);
987        assert_eq!(runner_state.pending_scheduled_probes(), 1);
988
989        // Now, we'll actually drive the `live` future to respond to the probe request, which should mark the component as live:
990        assert_ready!(live_future.poll());
991
992        assert!(registry.is_woken());
993        drive_until_quiesced(&mut registry);
994
995        assert!(component_live(&registry_state, COMPONENT_ID));
996
997        // However, since the first probe response timed out, and we haven't yet fired off our scheduled probe, receiving this late
998        // response should not trigger the scheduling of another probe:
999        assert_eq!(runner_state.pending_probe_timeouts(), 0);
1000        assert_eq!(runner_state.pending_scheduled_probes(), 1);
1001    }
1002
1003    #[track_caller]
1004    #[allow(clippy::type_complexity)]
1005    fn initialize_registry_with_component_and_shutdown(
1006        component_id: &str,
1007    ) -> (
1008        Health,
1009        Spawn<impl Future<Output = ()>>,
1010        Arc<Mutex<RegistryState>>,
1011        Arc<RunnerState>,
1012        oneshot::Sender<()>,
1013    ) {
1014        let registry = HealthRegistry::new();
1015        let registry_state = registry.state();
1016        let handle = registry.register_component(component_id).unwrap();
1017        let runner = registry.into_runner().expect("should not fail to create runner");
1018        let runner_state = runner.state();
1019
1020        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
1021        let registry_task = spawn(runner.run(shutdown_rx.map(|_| ())));
1022
1023        (handle, registry_task, registry_state, runner_state, shutdown_tx)
1024    }
1025
1026    #[tokio::test(start_paused = true)]
1027    async fn respawn_preserves_fresh_component_health() {
1028        // Create our registry with a registered component and drive the runner until the initial probe is sent:
1029        let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1030            initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1031        drive_until_quiesced(&mut registry);
1032
1033        // Respond to the probe request so the component becomes live:
1034        let mut live_future = spawn(handle.live());
1035        assert_ready!(live_future.poll());
1036        drive_until_quiesced(&mut registry);
1037        assert!(component_live(&registry_state, COMPONENT_ID));
1038
1039        // Shut down the runner gracefully, which returns the response receiver to the registry state:
1040        let _ = shutdown_tx.send(());
1041        assert_ready!(registry.poll());
1042
1043        // Respawn the runner immediately (no time advance), so the probe result is still fresh:
1044        let registry = HealthRegistry {
1045            inner: Arc::clone(&registry_state),
1046        };
1047        let runner = registry
1048            .into_runner()
1049            .expect("should be able to respawn after shutdown");
1050        let _runner_state = runner.state();
1051        let mut registry = spawn(runner.run(std::future::pending()));
1052        drive_until_quiesced(&mut registry);
1053
1054        // The component's health should be preserved as Live since its last response is fresh:
1055        assert!(component_live(&registry_state, COMPONENT_ID));
1056    }
1057
1058    #[tokio::test(start_paused = true)]
1059    async fn respawn_resets_stale_component_health() {
1060        // Create our registry with a registered component and drive the runner until the initial probe is sent:
1061        let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1062            initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1063        drive_until_quiesced(&mut registry);
1064
1065        // Respond to the probe request so the component becomes live:
1066        let mut live_future = spawn(handle.live());
1067        assert_ready!(live_future.poll());
1068        drive_until_quiesced(&mut registry);
1069        assert!(component_live(&registry_state, COMPONENT_ID));
1070
1071        // Shut down the runner gracefully, which returns the response receiver to the registry state:
1072        let _ = shutdown_tx.send(());
1073        assert_ready!(registry.poll());
1074
1075        // Advance time past the probe timeout so the last response becomes stale:
1076        tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
1077
1078        // Respawn the runner. The stale component should be reset to Unknown:
1079        let registry = HealthRegistry {
1080            inner: Arc::clone(&registry_state),
1081        };
1082        let runner = registry
1083            .into_runner()
1084            .expect("should be able to respawn after shutdown");
1085        let _runner_state = runner.state();
1086        let mut registry = spawn(runner.run(std::future::pending()));
1087        drive_until_quiesced(&mut registry);
1088
1089        // The component's health should have been reset to Unknown since its last response is stale:
1090        assert!(!component_live(&registry_state, COMPONENT_ID));
1091    }
1092}