Skip to main content

saluki_core/health/
mod.rs

1//! Health registry for tracking component readiness and liveness.
2
3use std::future::Future;
4#[cfg(test)]
5use std::sync::atomic::AtomicUsize;
6use std::{
7    collections::HashSet,
8    sync::{
9        atomic::{AtomicBool, Ordering::Relaxed},
10        Arc, Mutex,
11    },
12    time::Duration,
13};
14
15use futures::StreamExt as _;
16use saluki_error::{generic_error, GenericError};
17use saluki_metrics::static_metrics;
18use stringtheory::MetaString;
19use tokio::time::Instant;
20use tokio::{
21    select,
22    sync::{
23        mpsc::{self, error::TrySendError},
24        Notify,
25    },
26};
27use tokio_util::time::{delay_queue::Key, DelayQueue};
28use tracing::{debug, info, trace};
29
30mod api;
31pub use self::api::HealthAPIHandler;
32
33mod worker;
34pub use self::worker::HealthRegistryWorker;
35
36const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
37const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
38
39/// A handle for updating the health of a component.
40pub struct Health {
41    shared: Arc<SharedComponentState>,
42    request_rx: mpsc::Receiver<LivenessRequest>,
43    response_tx: mpsc::Sender<LivenessResponse>,
44}
45
46impl Health {
47    /// Marks the component as ready.
48    pub fn mark_ready(&mut self) {
49        self.update_readiness(true);
50    }
51
52    /// Marks the component as not ready.
53    pub fn mark_not_ready(&mut self) {
54        self.update_readiness(false);
55    }
56
57    fn update_readiness(&self, ready: bool) {
58        self.shared.ready.store(ready, Relaxed);
59        self.shared.telemetry.update_readiness(ready);
60    }
61
62    /// Waits for a liveness probe to be sent to the component, and then responds to it.
63    ///
64    /// This should generally be polled as part of a `select!` block to ensure it is checked alongside other
65    /// asynchronous operations.
66    pub async fn live(&mut self) {
67        // Simply wait for the health registry to send us a liveness probe, and if we receive one, we respond back to it
68        // immediately.
69        if let Some(request) = self.request_rx.recv().await {
70            let response = request.into_response();
71            let _ = self.response_tx.send(response).await;
72        }
73    }
74}
75
76#[derive(Clone, Copy, Eq, PartialEq)]
77enum HealthState {
78    Live,
79    Unknown,
80    Dead,
81}
82
83static_metrics!(
84    name => Telemetry,
85    prefix => health,
86    labels => [component_id: Arc<str>],
87    metrics => [
88        gauge(component_ready),
89        gauge(component_live),
90        trace_histogram(component_liveness_latency_seconds),
91    ]
92);
93
94impl Telemetry {
95    fn from_name(name: &str) -> Self {
96        Self::new(Arc::from(name))
97    }
98
99    fn update_readiness(&self, ready: bool) {
100        self.component_ready().set(if ready { 1.0 } else { 0.0 });
101    }
102
103    fn update_liveness(&self, state: HealthState, response_latency: Duration) {
104        let live = match state {
105            HealthState::Live => 1.0,
106            HealthState::Unknown => 0.0,
107            HealthState::Dead => -1.0,
108        };
109
110        self.component_live().set(live);
111        self.component_liveness_latency_seconds()
112            .record(response_latency.as_secs_f64());
113    }
114}
115
116struct SharedComponentState {
117    ready: AtomicBool,
118    telemetry: Telemetry,
119}
120
121struct ComponentState {
122    name: MetaString,
123    health: HealthState,
124    shared: Arc<SharedComponentState>,
125    request_tx: mpsc::Sender<LivenessRequest>,
126    last_response: Instant,
127    last_response_latency: Duration,
128}
129
130impl ComponentState {
131    fn new(name: MetaString, response_tx: mpsc::Sender<LivenessResponse>) -> (Self, Health) {
132        let shared = Arc::new(SharedComponentState {
133            ready: AtomicBool::new(false),
134            telemetry: Telemetry::from_name(&name),
135        });
136        let (request_tx, request_rx) = mpsc::channel(1);
137
138        let state = Self {
139            name,
140            health: HealthState::Unknown,
141            shared: Arc::clone(&shared),
142            request_tx,
143            last_response: Instant::now(),
144            last_response_latency: Duration::from_secs(0),
145        };
146
147        let handle = Health {
148            shared,
149            request_rx,
150            response_tx,
151        };
152
153        (state, handle)
154    }
155
156    fn is_ready(&self) -> bool {
157        // We consider a component ready if it's marked as ready (duh) and it's not dead.
158        //
159        // Being "dead" is a special case as it means the component is very likely not even running at all, not just
160        // responding slowly or deadlocked. In these cases, it can't possibly be ready since it's not even running.
161        self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
162    }
163
164    fn is_live(&self) -> bool {
165        self.health == HealthState::Live
166    }
167
168    fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
169        self.health = HealthState::Live;
170        self.last_response = response_sent;
171        self.last_response_latency = response_latency;
172        self.shared.telemetry.update_liveness(self.health, response_latency);
173    }
174
175    fn mark_not_live(&mut self) {
176        self.health = HealthState::Unknown;
177
178        // We use the default timeout as the latency for when the component is not considered alive.
179        self.shared
180            .telemetry
181            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
182    }
183
184    fn mark_dead(&mut self) {
185        self.health = HealthState::Dead;
186
187        // We use the default timeout as the latency for when the component is not considered alive.
188        self.shared
189            .telemetry
190            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
191    }
192}
193
194struct LivenessRequest {
195    component_id: usize,
196    timeout_key: Key,
197    request_sent: Instant,
198}
199
200impl LivenessRequest {
201    fn new(component_id: usize, timeout_key: Key) -> Self {
202        Self {
203            component_id,
204            timeout_key,
205            request_sent: Instant::now(),
206        }
207    }
208
209    fn into_response(self) -> LivenessResponse {
210        LivenessResponse {
211            request: self,
212            response_sent: Instant::now(),
213        }
214    }
215}
216
217struct LivenessResponse {
218    request: LivenessRequest,
219    response_sent: Instant,
220}
221
222enum HealthUpdate {
223    Alive {
224        last_response: Instant,
225        last_response_latency: Duration,
226    },
227    Unknown,
228    Dead,
229}
230
231impl HealthUpdate {
232    fn as_str(&self) -> &'static str {
233        match self {
234            HealthUpdate::Alive { .. } => "alive",
235            HealthUpdate::Unknown => "unknown",
236            HealthUpdate::Dead => "dead",
237        }
238    }
239}
240
241struct RegistryState {
242    registered_components: HashSet<MetaString>,
243    component_state: Vec<ComponentState>,
244    responses_tx: mpsc::Sender<LivenessResponse>,
245    responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
246    pending_components: Vec<usize>,
247    pending_components_notify: Arc<Notify>,
248}
249
250impl RegistryState {
251    fn new() -> Self {
252        let (responses_tx, responses_rx) = mpsc::channel(16);
253
254        Self {
255            registered_components: HashSet::new(),
256            component_state: Vec::new(),
257            responses_tx,
258            responses_rx: Some(responses_rx),
259            pending_components: Vec::new(),
260            pending_components_notify: Arc::new(Notify::new()),
261        }
262    }
263}
264
265/// A registry of components and their health.
266///
267/// `HealthRegistry` is responsible for tracking the health of all registered components, by storing both their
268/// readiness, which indicates whether or not they are initialized and generally ready to process data, as well as
269/// probing their liveness, which indicates if they're currently responding, or able to respond, to requests.
270///
271/// # Telemetry
272///
273/// The health registry emits some internal telemetry about the status of registered components. In particular, three
274/// metrics are emitted:
275///
276/// - `health.component_ready`: whether or not a component is ready (`gauge`, `0` for not ready, `1` for ready)
277/// - `health.component_alive`: whether or not a component is alive (`gauge`, `0` for not alive/unknown, `1` for alive, `-1` for dead)
278/// - `health.component_liveness_latency_secs`: the response latency of the component for liveness probes (`histogram`,
279///   in seconds)
280///
281/// All metrics have a `component_id` tag that corresponds to the name of the component that was given when registering it.
282#[derive(Clone)]
283pub struct HealthRegistry {
284    inner: Arc<Mutex<RegistryState>>,
285}
286
287impl HealthRegistry {
288    /// Creates an empty registry.
289    pub fn new() -> Self {
290        Self {
291            inner: Arc::new(Mutex::new(RegistryState::new())),
292        }
293    }
294
295    #[cfg(test)]
296    fn state(&self) -> Arc<Mutex<RegistryState>> {
297        Arc::clone(&self.inner)
298    }
299
300    /// Registers a component with the registry.
301    ///
302    /// A handle is returned that must be used by the component to set its readiness as well as respond to liveness
303    /// probes. See [`Health::mark_ready`], [`Health::mark_not_ready`], and [`Health::live`] for more information.
304    pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
305        let mut inner = self.inner.lock().unwrap();
306
307        // Make sure we don't already have this component registered.
308        let name = name.into();
309        if !inner.registered_components.insert(name.clone()) {
310            return None;
311        }
312
313        // Add the component state.
314        let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone());
315        let component_id = inner.component_state.len();
316        inner.component_state.push(state);
317
318        debug!(component_id, "Registered component '{}'.", name);
319
320        // Mark ourselves as having a pending component that needs to be scheduled.
321        inner.pending_components.push(component_id);
322        inner.pending_components_notify.notify_one();
323
324        Some(handle)
325    }
326
327    /// Gets an API handler for reporting the health of all components.
328    ///
329    /// This handler exposes routes for querying the readiness and liveness of all registered components. See
330    /// [`HealthAPIHandler`] for more information about routes and responses.
331    pub fn api_handler(&self) -> HealthAPIHandler {
332        HealthAPIHandler::from_state(Arc::clone(&self.inner))
333    }
334
335    /// Returns `true` if all components are ready.
336    pub fn all_ready(&self) -> bool {
337        let inner = self.inner.lock().unwrap();
338
339        for component in &inner.component_state {
340            if !component.is_ready() {
341                return false;
342            }
343        }
344
345        true
346    }
347
348    /// Creates a [`HealthRegistryWorker`] that can be added to a supervisor to run the health registry.
349    ///
350    /// The worker handles the lifecycle of the health registry runner, including registering the health API routes
351    /// dynamically and running the liveness probing event loop.
352    pub fn worker(&self) -> HealthRegistryWorker {
353        HealthRegistryWorker::new(self.clone())
354    }
355
356    pub(crate) fn into_runner(self) -> Result<Runner, GenericError> {
357        // Make sure the runner hasn't already been spawned.
358        let (responses_rx, pending_components_notify) = {
359            let mut inner = self.inner.lock().unwrap();
360            let responses_rx = match inner.responses_rx.take() {
361                Some(rx) => rx,
362                None => return Err(generic_error!("health registry already spawned")),
363            };
364
365            let pending_components_notify = Arc::clone(&inner.pending_components_notify);
366            (responses_rx, pending_components_notify)
367        };
368
369        Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
370    }
371}
372
373/// A guard that returns the response receiver back to the registry when dropped.
374///
375/// This allows the health registry runner to be restarted gracefully: whenever the runner task
376/// finishes and this guard is dropped (for example, after a shutdown or task cancellation), the
377/// receiver is returned to the registry state so that a subsequent call to `spawn()` can succeed.
378struct RunnerGuard {
379    registry: Arc<Mutex<RegistryState>>,
380    responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
381}
382
383impl Drop for RunnerGuard {
384    fn drop(&mut self) {
385        if let Some(rx) = self.responses_rx.take() {
386            let mut inner = self.registry.lock().expect("registry state poisoned");
387            inner.responses_rx = Some(rx);
388            debug!("Returned response receiver to registry state.");
389        }
390    }
391}
392
393#[cfg(test)]
394struct RunnerState {
395    pending_scheduled_probes: AtomicUsize,
396    pending_probe_timeouts: AtomicUsize,
397}
398
399#[cfg(test)]
400impl RunnerState {
401    fn new() -> Self {
402        Self {
403            pending_scheduled_probes: AtomicUsize::new(0),
404            pending_probe_timeouts: AtomicUsize::new(0),
405        }
406    }
407
408    fn pending_scheduled_probes(&self) -> usize {
409        self.pending_scheduled_probes.load(Relaxed)
410    }
411
412    fn pending_probe_timeouts(&self) -> usize {
413        self.pending_probe_timeouts.load(Relaxed)
414    }
415
416    fn increment_pending_scheduled_probes(&self) {
417        self.pending_scheduled_probes.fetch_add(1, Relaxed);
418    }
419
420    fn increment_pending_probe_timeouts(&self) {
421        self.pending_probe_timeouts.fetch_add(1, Relaxed);
422    }
423
424    fn decrement_pending_scheduled_probes(&self) {
425        self.pending_scheduled_probes.fetch_sub(1, Relaxed);
426    }
427
428    fn decrement_pending_probe_timeouts(&self) {
429        self.pending_probe_timeouts.fetch_sub(1, Relaxed);
430    }
431}
432
433pub(super) struct Runner {
434    registry: Arc<Mutex<RegistryState>>,
435    pending_probes: DelayQueue<usize>,
436    pending_timeouts: DelayQueue<usize>,
437    guard: RunnerGuard,
438    pending_components_notify: Arc<Notify>,
439    #[cfg(test)]
440    state: Arc<RunnerState>,
441}
442
443impl Runner {
444    fn new(
445        registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
446        pending_components_notify: Arc<Notify>,
447    ) -> Self {
448        #[cfg(test)]
449        let state = Arc::new(RunnerState::new());
450
451        let guard = RunnerGuard {
452            registry: Arc::clone(&registry),
453            responses_rx: Some(responses_rx),
454        };
455
456        Self {
457            registry,
458            pending_probes: DelayQueue::new(),
459            pending_timeouts: DelayQueue::new(),
460            guard,
461            pending_components_notify,
462            #[cfg(test)]
463            state,
464        }
465    }
466
467    #[cfg(test)]
468    fn state(&self) -> Arc<RunnerState> {
469        Arc::clone(&self.state)
470    }
471
472    fn drain_pending_components(&mut self) -> Vec<usize> {
473        // Drain all pending components.
474        let mut registry = self.registry.lock().unwrap();
475        registry.pending_components.drain(..).collect()
476    }
477
478    fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
479        let mut registry = self.registry.lock().unwrap();
480        let component_state = &mut registry.component_state[component_id];
481
482        // Check if our component is already dead, in which case we don't need to send a liveness probe.
483        if component_state.request_tx.is_closed() {
484            debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
485            return Some(HealthUpdate::Dead);
486        }
487
488        trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
489
490        // Our component _isn't_ dead, so try to send a liveness probe to it.
491        //
492        // We'll register an entry in `pending_timeouts` that automatically marks the component as not live if we don't
493        // receive a response to the liveness probe within the timeout duration.
494        let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
495
496        #[cfg(test)]
497        self.state.increment_pending_probe_timeouts();
498
499        let request = LivenessRequest::new(component_id, timeout_key);
500        if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
501            debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
502
503            // We failed to send the probe to the component due to the component being dead. We'll drop our pending
504            // timeout as we're going to mark this component dead right now.
505            //
506            // When our send fails due to the channel being full, that's OK: it means it's going to be handled by an
507            // existing timeout and will be probed again later.
508            self.pending_timeouts.remove(&request.timeout_key);
509
510            #[cfg(test)]
511            self.state.decrement_pending_probe_timeouts();
512
513            return Some(HealthUpdate::Dead);
514        }
515
516        None
517    }
518
519    fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
520        #[cfg(test)]
521        self.state.increment_pending_scheduled_probes();
522
523        self.pending_probes.insert(component_id, duration);
524    }
525
526    fn schedule_all_existing_components(&mut self, responses_rx: &mut mpsc::Receiver<LivenessResponse>) {
527        // First, drain any pending components to avoid scheduling them twice.
528        // This handles the case where components were registered before the runner started.
529        let _pending = self.drain_pending_components();
530
531        // Drain any queued probe responses from the previous runner. These responses were sent by
532        // components before the runner shut down but weren't processed. Processing them now updates
533        // `last_response` timestamps, which affects the staleness check below — a response that
534        // arrived just before shutdown should count as fresh.
535        while let Ok(response) = responses_rx.try_recv() {
536            self.handle_component_probe_response(response);
537        }
538
539        // Determine which components have stale probe results. Components whose last response is
540        // within the probe timeout are considered fresh and their health state is preserved,
541        // avoiding unnecessary bursts of failed liveness/readiness probes on runner restart.
542        let (component_count, stale_component_ids) = {
543            let registry = self.registry.lock().unwrap();
544            let now = Instant::now();
545            let stale_ids: Vec<usize> = (0..registry.component_state.len())
546                .filter(|&id| {
547                    now.duration_since(registry.component_state[id].last_response) >= DEFAULT_PROBE_TIMEOUT_DUR
548                })
549                .collect();
550            (registry.component_state.len(), stale_ids)
551        };
552
553        // Only reset health to Unknown for components with stale probe results.
554        for &component_id in &stale_component_ids {
555            self.process_component_health_update(component_id, HealthUpdate::Unknown);
556        }
557
558        // Schedule immediate probes for all components regardless of staleness.
559        for component_id in 0..component_count {
560            self.schedule_probe_for_component(component_id, Duration::ZERO);
561        }
562
563        if component_count > 0 {
564            let fresh_count = component_count - stale_component_ids.len();
565            debug!(
566                component_count,
567                fresh_count,
568                stale_count = stale_component_ids.len(),
569                "Scheduled probes for all existing components."
570            );
571        }
572    }
573
574    fn handle_component_probe_response(&mut self, response: LivenessResponse) {
575        let component_id = response.request.component_id;
576        let timeout_key = response.request.timeout_key;
577        let request_sent = response.request.request_sent;
578        let response_sent = response.response_sent;
579        let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
580
581        // Clear any pending timeouts for this component and schedule the next probe.
582        let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
583        if !timeout_was_pending {
584            let mut registry = self.registry.lock().unwrap();
585            let component_state = &mut registry.component_state[component_id];
586
587            debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
588        }
589
590        // Update the component's health to show as alive.
591        let update = HealthUpdate::Alive {
592            last_response: response_sent,
593            last_response_latency: response_latency,
594        };
595        self.process_component_health_update(component_id, update);
596
597        // Only schedule the next probe if we successfully removed the timeout, meaning it hadn't fired yet.
598        // This prevents duplicate probe scheduling when a response arrives after a timeout.
599        if timeout_was_pending {
600            #[cfg(test)]
601            self.state.decrement_pending_probe_timeouts();
602
603            self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
604        }
605    }
606
607    fn handle_component_timeout(&mut self, component_id: usize) {
608        // Update the component's health to show as not alive.
609        self.process_component_health_update(component_id, HealthUpdate::Unknown);
610
611        // Schedule the next probe for this component.
612        self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
613    }
614
615    fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
616        // Update the component's health state based on the given update.
617        let mut registry = self.registry.lock().unwrap();
618        let component_state = &mut registry.component_state[component_id];
619        trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
620
621        match update {
622            HealthUpdate::Alive {
623                last_response,
624                last_response_latency,
625            } => component_state.mark_live(last_response, last_response_latency),
626            HealthUpdate::Unknown => component_state.mark_not_live(),
627            HealthUpdate::Dead => component_state.mark_dead(),
628        }
629    }
630
631    async fn run<F: Future<Output = ()>>(mut self, shutdown: F) {
632        info!("Health checker running.");
633
634        // Take the response receiver out of the guard so we can use it in the select loop.
635        // It will be put back when the guard is dropped.
636        let mut responses_rx = self
637            .guard
638            .responses_rx
639            .take()
640            .expect("responses_rx should always be Some when Runner is created");
641
642        // Schedule probes for all existing components. This allows the runner to "pick up where it
643        // left off" after a restart - any components that were registered before the runner was
644        // restarted will be immediately probed.
645        self.schedule_all_existing_components(&mut responses_rx);
646
647        // Pin the shutdown future so we can poll it in the select loop.
648        let mut shutdown = std::pin::pin!(shutdown);
649
650        loop {
651            select! {
652                // Shutdown signal received - exit the run loop gracefully.
653                _ = &mut shutdown => {
654                    info!("Health checker shutting down.");
655                    break;
656                },
657
658                // A component has been scheduled to have a liveness probe sent to it.
659                Some(entry) = self.pending_probes.next() => {
660                    #[cfg(test)]
661                    self.state.decrement_pending_scheduled_probes();
662
663                    let component_id = entry.into_inner();
664                    if let Some(health_update) = self.send_component_probe_request(component_id) {
665                        // If we got a health update for this component, that means we detected that it's dead, so we need
666                        // to do an out-of-band update to its health.
667                        self.process_component_health_update(component_id, health_update);
668                    }
669                },
670
671                // A component's outstanding liveness probe has expired.
672                Some(entry) = self.pending_timeouts.next() => {
673                    #[cfg(test)]
674                    self.state.decrement_pending_probe_timeouts();
675
676                    let component_id = entry.into_inner();
677                    self.handle_component_timeout(component_id);
678                },
679
680                // A probe response has been received.
681                Some(response) = responses_rx.recv() => {
682                    self.handle_component_probe_response(response);
683                },
684
685                // A component is pending finalization of their registration.
686                _ = self.pending_components_notify.notified() => {
687                    // Drain all pending components, give them a clean initial state of "unknown", and immediately schedule a probe for them.
688                    let pending_component_ids = self.drain_pending_components();
689                    for pending_component_id in pending_component_ids {
690                        self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
691                        self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
692                    }
693                },
694            }
695        }
696
697        // Put the receiver back in the guard so it can be returned to the registry state when dropped.
698        self.guard.responses_rx = Some(responses_rx);
699
700        // When we exit the loop, the RunnerGuard will be dropped, returning the response receiver
701        // back to the registry state so that a subsequent spawn() can succeed.
702    }
703}
704
705#[cfg(test)]
706mod tests {
707    use std::future::Future;
708
709    use futures::FutureExt as _;
710    use tokio::sync::oneshot;
711    use tokio_test::{
712        assert_pending, assert_ready,
713        task::{spawn, Spawn},
714    };
715
716    use super::*;
717
718    const COMPONENT_ID: &str = "test_component";
719
720    #[track_caller]
721    fn initialize_registry_with_component(
722        component_id: &str,
723    ) -> (
724        Health,
725        Spawn<impl Future<Output = ()>>,
726        Arc<Mutex<RegistryState>>,
727        Arc<RunnerState>,
728    ) {
729        let registry = HealthRegistry::new();
730        let registry_state = registry.state();
731
732        // Add our component to the registry:
733        let handle = registry.register_component(component_id).unwrap();
734
735        // Extract the registry runner task and poll it until it's quiesced.
736        //
737        // This ensures that the component is registered, and that it schedules/sends an initial probe request to the component:
738        let runner = registry.into_runner().expect("should not fail to create runner");
739        let runner_state = runner.state();
740
741        // Create a shutdown future that never resolves (for tests that don't need shutdown).
742        let shutdown = std::future::pending();
743        let registry_task = spawn(runner.run(shutdown));
744
745        (handle, registry_task, registry_state, runner_state)
746    }
747
748    #[track_caller]
749    fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
750        assert_pending!(task.poll());
751        while task.is_woken() {
752            assert_pending!(task.poll());
753        }
754    }
755
756    fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
757        let state = state.lock().unwrap();
758        state
759            .component_state
760            .iter()
761            .find(|state| state.name == component_id)
762            .map(|state| state.is_live())
763            .unwrap()
764    }
765
766    #[test]
767    fn basic_registration() {
768        let registry = HealthRegistry::new();
769        assert!(registry.register_component(COMPONENT_ID).is_some());
770    }
771
772    #[test]
773    fn duplicate_component_registration_fails() {
774        let registry = HealthRegistry::new();
775
776        // Registering the same component twice should fail:
777        assert!(registry.register_component(COMPONENT_ID).is_some());
778        assert!(registry.register_component(COMPONENT_ID).is_none());
779    }
780
781    #[test]
782    fn duplicate_runner_creation_fails_while_running() {
783        let registry = HealthRegistry::new();
784        let registry2 = registry.clone();
785
786        // First runner creation should succeed. We hold on to it so the RunnerGuard doesn't
787        // return the receiver back to the registry state.
788        let _runner = registry.into_runner().expect("first runner creation should succeed");
789
790        // Second creation should fail while the first runner still holds the receiver.
791        assert!(registry2.into_runner().is_err());
792    }
793
794    #[tokio::test]
795    async fn registry_can_be_respawned_after_shutdown() {
796        let registry = HealthRegistry::new();
797        let registry2 = registry.clone();
798        let registry3 = registry.clone();
799
800        // First runner creation should succeed.
801        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
802        let runner = registry.into_runner().expect("first runner creation should succeed");
803
804        // Run the runner on a spawned task so we can trigger shutdown.
805        let join_handle = tokio::spawn(runner.run(shutdown_rx.map(|_| ())));
806
807        // Trigger shutdown.
808        let _ = shutdown_tx.send(());
809
810        // Wait for the runner to stop.
811        join_handle.await.expect("runner should complete without panic");
812
813        // Now we should be able to create a runner again (the RunnerGuard returned the receiver).
814        let _runner2 = registry2
815            .into_runner()
816            .expect("should be able to create runner after shutdown");
817
818        // But not a third time while the second runner holds the receiver.
819        assert!(
820            registry3.into_runner().is_err(),
821            "should not be able to create runner while one exists"
822        );
823    }
824
825    #[test]
826    fn readiness() {
827        let registry = HealthRegistry::new();
828        assert!(registry.all_ready());
829
830        // Components should start out as not ready, so adding this component changes the registry to not ready overall:
831        let mut handle = registry.register_component(COMPONENT_ID).unwrap();
832        assert!(!registry.all_ready());
833
834        // Now mark the component as ready:
835        handle.mark_ready();
836        assert!(registry.all_ready());
837
838        // Now mark the component as not ready:
839        handle.mark_not_ready();
840        assert!(!registry.all_ready());
841    }
842
843    #[tokio::test(start_paused = true)]
844    async fn component_responds_before_timeout() {
845        // Create our registry with a registered component:
846        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
847
848        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
849        // which means the component hasn't been registered yet and no probe request has been sent:
850        let mut live_future = spawn(handle.live());
851        assert_pending!(live_future.poll());
852        assert_eq!(runner_state.pending_probe_timeouts(), 0);
853        assert_eq!(runner_state.pending_scheduled_probes(), 0);
854
855        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
856        drive_until_quiesced(&mut registry);
857        assert_eq!(runner_state.pending_probe_timeouts(), 1);
858        assert_eq!(runner_state.pending_scheduled_probes(), 0);
859
860        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
861        assert!(!component_live(&registry_state, COMPONENT_ID));
862
863        // After polling the registry task, we should have sent a probe request which will have now woken up our `live` future.
864        //
865        // Poll the future which should then respond to the probe request:
866        assert!(live_future.is_woken());
867        assert_ready!(live_future.poll());
868
869        // The registry task should have been woken by the probe response.
870        //
871        // Drive the registry task until it is quiesced and ensure that the component is now live:
872        assert!(registry.is_woken());
873        drive_until_quiesced(&mut registry);
874
875        assert!(component_live(&registry_state, COMPONENT_ID));
876
877        // Since the probe response was received, we should have a pending schedule probe now since this is a "normal" probe now,
878        // and isn't the initial probe request which is scheduled immediately:
879        assert_eq!(runner_state.pending_probe_timeouts(), 0);
880        assert_eq!(runner_state.pending_scheduled_probes(), 1);
881    }
882
883    #[tokio::test(start_paused = true)]
884    async fn component_responds_after_timeout() {
885        // Create our registry with a registered component:
886        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
887
888        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
889        // which means the component hasn't been registered yet and no probe request has been sent:
890        let mut live_future = spawn(handle.live());
891        assert_pending!(live_future.poll());
892        assert_eq!(runner_state.pending_probe_timeouts(), 0);
893        assert_eq!(runner_state.pending_scheduled_probes(), 0);
894
895        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
896        drive_until_quiesced(&mut registry);
897        assert_eq!(runner_state.pending_probe_timeouts(), 1);
898        assert_eq!(runner_state.pending_scheduled_probes(), 0);
899
900        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
901        assert!(!component_live(&registry_state, COMPONENT_ID));
902
903        // After polling the registry task, we should have sent a probe request which will have now woken up
904        // our `live` future, but we won't yet poll it. In fact, we'll advance time _past_ the probe timeout to simulate
905        // the probe timeout expiring:
906        assert!(live_future.is_woken());
907        assert!(!registry.is_woken());
908
909        tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
910
911        // The registry task should have been woken by the probe timeout expiring.
912        //
913        // Drive the registry task until it is quiesced and ensure that the component is still not live:
914        assert!(registry.is_woken());
915        drive_until_quiesced(&mut registry);
916
917        assert!(!component_live(&registry_state, COMPONENT_ID));
918
919        // Since the probe response was not received, we should have a pending schedule probe now since this is a "normal" probe now,
920        // and isn't the initial probe request which is scheduled immediately:
921        assert_eq!(runner_state.pending_probe_timeouts(), 0);
922        assert_eq!(runner_state.pending_scheduled_probes(), 1);
923
924        // Now, we'll actually drive the `live` future to respond to the probe request, which should mark the component as live:
925        assert_ready!(live_future.poll());
926
927        assert!(registry.is_woken());
928        drive_until_quiesced(&mut registry);
929
930        assert!(component_live(&registry_state, COMPONENT_ID));
931
932        // However, since the first probe response timed out, and we haven't yet fired off our scheduled probe, receiving this late
933        // response should not trigger the scheduling of another probe:
934        assert_eq!(runner_state.pending_probe_timeouts(), 0);
935        assert_eq!(runner_state.pending_scheduled_probes(), 1);
936    }
937
938    #[track_caller]
939    #[allow(clippy::type_complexity)]
940    fn initialize_registry_with_component_and_shutdown(
941        component_id: &str,
942    ) -> (
943        Health,
944        Spawn<impl Future<Output = ()>>,
945        Arc<Mutex<RegistryState>>,
946        Arc<RunnerState>,
947        oneshot::Sender<()>,
948    ) {
949        let registry = HealthRegistry::new();
950        let registry_state = registry.state();
951        let handle = registry.register_component(component_id).unwrap();
952        let runner = registry.into_runner().expect("should not fail to create runner");
953        let runner_state = runner.state();
954
955        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
956        let registry_task = spawn(runner.run(shutdown_rx.map(|_| ())));
957
958        (handle, registry_task, registry_state, runner_state, shutdown_tx)
959    }
960
961    #[tokio::test(start_paused = true)]
962    async fn respawn_preserves_fresh_component_health() {
963        // Create our registry with a registered component and drive the runner until the initial probe is sent:
964        let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
965            initialize_registry_with_component_and_shutdown(COMPONENT_ID);
966        drive_until_quiesced(&mut registry);
967
968        // Respond to the probe request so the component becomes live:
969        let mut live_future = spawn(handle.live());
970        assert_ready!(live_future.poll());
971        drive_until_quiesced(&mut registry);
972        assert!(component_live(&registry_state, COMPONENT_ID));
973
974        // Shut down the runner gracefully, which returns the response receiver to the registry state:
975        let _ = shutdown_tx.send(());
976        assert_ready!(registry.poll());
977
978        // Respawn the runner immediately (no time advance), so the probe result is still fresh:
979        let registry = HealthRegistry {
980            inner: Arc::clone(&registry_state),
981        };
982        let runner = registry
983            .into_runner()
984            .expect("should be able to respawn after shutdown");
985        let _runner_state = runner.state();
986        let mut registry = spawn(runner.run(std::future::pending()));
987        drive_until_quiesced(&mut registry);
988
989        // The component's health should be preserved as Live since its last response is fresh:
990        assert!(component_live(&registry_state, COMPONENT_ID));
991    }
992
993    #[tokio::test(start_paused = true)]
994    async fn respawn_resets_stale_component_health() {
995        // Create our registry with a registered component and drive the runner until the initial probe is sent:
996        let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
997            initialize_registry_with_component_and_shutdown(COMPONENT_ID);
998        drive_until_quiesced(&mut registry);
999
1000        // Respond to the probe request so the component becomes live:
1001        let mut live_future = spawn(handle.live());
1002        assert_ready!(live_future.poll());
1003        drive_until_quiesced(&mut registry);
1004        assert!(component_live(&registry_state, COMPONENT_ID));
1005
1006        // Shut down the runner gracefully, which returns the response receiver to the registry state:
1007        let _ = shutdown_tx.send(());
1008        assert_ready!(registry.poll());
1009
1010        // Advance time past the probe timeout so the last response becomes stale:
1011        tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
1012
1013        // Respawn the runner. The stale component should be reset to Unknown:
1014        let registry = HealthRegistry {
1015            inner: Arc::clone(&registry_state),
1016        };
1017        let runner = registry
1018            .into_runner()
1019            .expect("should be able to respawn after shutdown");
1020        let _runner_state = runner.state();
1021        let mut registry = spawn(runner.run(std::future::pending()));
1022        drive_until_quiesced(&mut registry);
1023
1024        // The component's health should have been reset to Unknown since its last response is stale:
1025        assert!(!component_live(&registry_state, COMPONENT_ID));
1026    }
1027}