Skip to main content

saluki_core/health/
mod.rs

1//! Health registry for tracking component readiness and liveness.
2
3use std::future::Future;
4#[cfg(test)]
5use std::sync::atomic::AtomicUsize;
6use std::{
7    collections::HashSet,
8    sync::{
9        atomic::{AtomicBool, Ordering::Relaxed},
10        Arc, Mutex,
11    },
12    time::Duration,
13};
14
15use futures::StreamExt as _;
16use saluki_error::{generic_error, GenericError};
17use saluki_metrics::static_metrics;
18use stringtheory::MetaString;
19use tokio::time::Instant;
20use tokio::{
21    select,
22    sync::{
23        mpsc::{self, error::TrySendError},
24        Notify,
25    },
26};
27use tokio_util::time::{delay_queue::Key, DelayQueue};
28use tracing::{debug, info, trace};
29
30mod api;
31pub use self::api::HealthAPIHandler;
32
33mod worker;
34pub use self::worker::HealthRegistryWorker;
35
36const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
37const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
38
39/// A handle for updating the health of a component.
40pub struct Health {
41    shared: Arc<SharedComponentState>,
42    request_rx: mpsc::Receiver<LivenessRequest>,
43    response_tx: mpsc::Sender<LivenessResponse>,
44    readiness_notify: Arc<Notify>,
45}
46
47impl Health {
48    /// Marks the component as ready.
49    pub fn mark_ready(&mut self) {
50        self.update_readiness(true);
51    }
52
53    /// Marks the component as not ready.
54    pub fn mark_not_ready(&mut self) {
55        self.update_readiness(false);
56    }
57
58    fn update_readiness(&self, ready: bool) {
59        self.shared.ready.store(ready, Relaxed);
60        self.shared.telemetry.update_readiness(ready);
61
62        // Wake any tasks waiting in `HealthRegistry::all_ready` so they can re-check whether all components are ready.
63        if ready {
64            self.readiness_notify.notify_waiters();
65        }
66    }
67
68    /// Waits for a liveness probe to be sent to the component, and then responds to it.
69    ///
70    /// This should generally be polled as part of a `select!` block to ensure it's checked alongside other
71    /// asynchronous operations.
72    pub async fn live(&mut self) {
73        // Simply wait for the health registry to send us a liveness probe, and if we receive one, we respond back to it
74        // immediately.
75        if let Some(request) = self.request_rx.recv().await {
76            let response = request.into_response();
77            let _ = self.response_tx.send(response).await;
78        }
79    }
80}
81
82#[derive(Clone, Copy, Eq, PartialEq)]
83enum HealthState {
84    Live,
85    Unknown,
86    Dead,
87}
88
89static_metrics!(
90    name => Telemetry,
91    prefix => health,
92    labels => [component_id: Arc<str>],
93    metrics => [
94        gauge(component_ready),
95        gauge(component_live),
96        trace_histogram(component_liveness_latency_seconds),
97    ]
98);
99
100impl Telemetry {
101    fn from_name(name: &str) -> Self {
102        Self::new(Arc::from(name))
103    }
104
105    fn update_readiness(&self, ready: bool) {
106        self.component_ready().set(if ready { 1.0 } else { 0.0 });
107    }
108
109    fn update_liveness(&self, state: HealthState, response_latency: Duration) {
110        let live = match state {
111            HealthState::Live => 1.0,
112            HealthState::Unknown => 0.0,
113            HealthState::Dead => -1.0,
114        };
115
116        self.component_live().set(live);
117        self.component_liveness_latency_seconds()
118            .record(response_latency.as_secs_f64());
119    }
120}
121
122struct SharedComponentState {
123    ready: AtomicBool,
124    telemetry: Telemetry,
125}
126
127struct ComponentState {
128    name: MetaString,
129    health: HealthState,
130    shared: Arc<SharedComponentState>,
131    request_tx: mpsc::Sender<LivenessRequest>,
132    last_response: Instant,
133    last_response_latency: Duration,
134}
135
136impl ComponentState {
137    fn new(
138        name: MetaString, response_tx: mpsc::Sender<LivenessResponse>, readiness_notify: Arc<Notify>,
139    ) -> (Self, Health) {
140        let shared = Arc::new(SharedComponentState {
141            ready: AtomicBool::new(false),
142            telemetry: Telemetry::from_name(&name),
143        });
144        let (request_tx, request_rx) = mpsc::channel(1);
145
146        let state = Self {
147            name,
148            health: HealthState::Unknown,
149            shared: Arc::clone(&shared),
150            request_tx,
151            last_response: Instant::now(),
152            last_response_latency: Duration::from_secs(0),
153        };
154
155        let handle = Health {
156            shared,
157            request_rx,
158            response_tx,
159            readiness_notify,
160        };
161
162        (state, handle)
163    }
164
165    fn is_ready(&self) -> bool {
166        // We consider a component ready if it's marked as ready (duh) and it's not dead.
167        //
168        // Being "dead" is a special case as it means the component is very likely not even running at all, not just
169        // responding slowly or deadlocked. In these cases, it can't possibly be ready since it's not even running.
170        self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
171    }
172
173    fn is_live(&self) -> bool {
174        self.health == HealthState::Live
175    }
176
177    fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
178        self.health = HealthState::Live;
179        self.last_response = response_sent;
180        self.last_response_latency = response_latency;
181        self.shared.telemetry.update_liveness(self.health, response_latency);
182    }
183
184    fn mark_not_live(&mut self) {
185        self.health = HealthState::Unknown;
186
187        // We use the default timeout as the latency for when the component is not considered alive.
188        self.shared
189            .telemetry
190            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
191    }
192
193    fn mark_dead(&mut self) {
194        self.health = HealthState::Dead;
195
196        // We use the default timeout as the latency for when the component is not considered alive.
197        self.shared
198            .telemetry
199            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
200    }
201}
202
203struct LivenessRequest {
204    component_id: usize,
205    timeout_key: Key,
206    request_sent: Instant,
207}
208
209impl LivenessRequest {
210    fn new(component_id: usize, timeout_key: Key) -> Self {
211        Self {
212            component_id,
213            timeout_key,
214            request_sent: Instant::now(),
215        }
216    }
217
218    fn into_response(self) -> LivenessResponse {
219        LivenessResponse {
220            request: self,
221            response_sent: Instant::now(),
222        }
223    }
224}
225
226struct LivenessResponse {
227    request: LivenessRequest,
228    response_sent: Instant,
229}
230
231enum HealthUpdate {
232    Alive {
233        last_response: Instant,
234        last_response_latency: Duration,
235    },
236    Unknown,
237    Dead,
238}
239
240impl HealthUpdate {
241    fn as_str(&self) -> &'static str {
242        match self {
243            HealthUpdate::Alive { .. } => "alive",
244            HealthUpdate::Unknown => "unknown",
245            HealthUpdate::Dead => "dead",
246        }
247    }
248}
249
250struct RegistryState {
251    registered_components: HashSet<MetaString>,
252    component_state: Vec<ComponentState>,
253    responses_tx: mpsc::Sender<LivenessResponse>,
254    responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
255    pending_components: Vec<usize>,
256    pending_components_notify: Arc<Notify>,
257    readiness_notify: Arc<Notify>,
258}
259
260impl RegistryState {
261    fn new() -> Self {
262        let (responses_tx, responses_rx) = mpsc::channel(16);
263
264        Self {
265            registered_components: HashSet::new(),
266            component_state: Vec::new(),
267            responses_tx,
268            responses_rx: Some(responses_rx),
269            pending_components: Vec::new(),
270            pending_components_notify: Arc::new(Notify::new()),
271            readiness_notify: Arc::new(Notify::new()),
272        }
273    }
274}
275
276/// A registry of components and their health.
277///
278/// `HealthRegistry` is responsible for tracking the health of all registered components, by storing both their
279/// readiness, which indicates whether or not they're initialized and generally ready to process data, as well as
280/// probing their liveness, which indicates if they're currently responding, or able to respond, to requests.
281///
282/// # Telemetry
283///
284/// The health registry emits some internal telemetry about the status of registered components. In particular, three
285/// metrics are emitted:
286///
287/// - `health.component_ready`: whether or not a component is ready (`gauge`, `0` for not ready, `1` for ready)
288/// - `health.component_alive`: whether or not a component is alive (`gauge`, `0` for not alive/unknown, `1` for alive, `-1` for dead)
289/// - `health.component_liveness_latency_secs`: the response latency of the component for liveness probes (`histogram`,
290///   in seconds)
291///
292/// All metrics have a `component_id` tag that corresponds to the name of the component that was given when registering it.
293#[derive(Clone)]
294pub struct HealthRegistry {
295    inner: Arc<Mutex<RegistryState>>,
296}
297
298impl HealthRegistry {
299    /// Creates an empty registry.
300    pub fn new() -> Self {
301        Self {
302            inner: Arc::new(Mutex::new(RegistryState::new())),
303        }
304    }
305
306    #[cfg(test)]
307    fn state(&self) -> Arc<Mutex<RegistryState>> {
308        Arc::clone(&self.inner)
309    }
310
311    /// Registers a component with the registry.
312    ///
313    /// A handle is returned that must be used by the component to set its readiness as well as respond to liveness
314    /// probes. See [`Health::mark_ready`], [`Health::mark_not_ready`], and [`Health::live`] for more information.
315    pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
316        let mut inner = self.inner.lock().unwrap();
317
318        // Make sure we don't already have this component registered.
319        let name = name.into();
320        if !inner.registered_components.insert(name.clone()) {
321            return None;
322        }
323
324        // Add the component state.
325        let readiness_notify = Arc::clone(&inner.readiness_notify);
326        let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone(), readiness_notify);
327        let component_id = inner.component_state.len();
328        inner.component_state.push(state);
329
330        debug!(component_id, "Registered component '{}'.", name);
331
332        // Mark ourselves as having a pending component that needs to be scheduled.
333        inner.pending_components.push(component_id);
334        inner.pending_components_notify.notify_one();
335
336        Some(handle)
337    }
338
339    /// Gets an API handler for reporting the health of all components.
340    ///
341    /// This handler exposes routes for querying the readiness and liveness of all registered components. See
342    /// [`HealthAPIHandler`] for more information about routes and responses.
343    pub fn api_handler(&self) -> HealthAPIHandler {
344        HealthAPIHandler::from_state(Arc::clone(&self.inner))
345    }
346
347    /// Waits until all registered components are ready.
348    ///
349    /// If no components are registered, or all currently registered components are ready, the method returns immediately. Otherwise,
350    /// the method will return as soon as all registered components transition to ready.
351    ///
352    /// Note that components can be registered while this method is waiting, which will influence how long this method
353    /// takes to return. Callers should ensure that all components have been registered before calling this method.
354    pub async fn all_ready(&self) {
355        let readiness_notify = {
356            let inner = self.inner.lock().unwrap();
357            Arc::clone(&inner.readiness_notify)
358        };
359
360        loop {
361            // Register as a waiter _before_ checking to avoid missing notifications during the check.
362            let notified = readiness_notify.notified();
363
364            if self.check_all_ready() {
365                return;
366            }
367
368            notified.await;
369        }
370    }
371
372    fn check_all_ready(&self) -> bool {
373        let inner = self.inner.lock().unwrap();
374        inner.component_state.iter().all(|component| component.is_ready())
375    }
376
377    /// Creates a [`HealthRegistryWorker`] that can be added to a supervisor to run the health registry.
378    ///
379    /// The worker handles the lifecycle of the health registry runner, including registering the health API routes
380    /// dynamically and running the liveness probing event loop.
381    pub fn worker(&self) -> HealthRegistryWorker {
382        HealthRegistryWorker::new(self.clone())
383    }
384
385    pub(crate) fn into_runner(self) -> Result<Runner, GenericError> {
386        // Make sure the runner hasn't already been spawned.
387        let (responses_rx, pending_components_notify) = {
388            let mut inner = self.inner.lock().unwrap();
389            let responses_rx = match inner.responses_rx.take() {
390                Some(rx) => rx,
391                None => return Err(generic_error!("health registry already spawned")),
392            };
393
394            let pending_components_notify = Arc::clone(&inner.pending_components_notify);
395            (responses_rx, pending_components_notify)
396        };
397
398        Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
399    }
400}
401
402/// A guard that returns the response receiver back to the registry when dropped.
403///
404/// This allows the health registry runner to be restarted gracefully: whenever the runner task
405/// finishes and this guard is dropped (for example, after a shutdown or task cancellation), the
406/// receiver is returned to the registry state so that a subsequent call to `spawn()` can succeed.
407struct RunnerGuard {
408    registry: Arc<Mutex<RegistryState>>,
409    responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
410}
411
412impl Drop for RunnerGuard {
413    fn drop(&mut self) {
414        if let Some(rx) = self.responses_rx.take() {
415            let mut inner = self.registry.lock().expect("registry state poisoned");
416            inner.responses_rx = Some(rx);
417            debug!("Returned response receiver to registry state.");
418        }
419    }
420}
421
422#[cfg(test)]
423struct RunnerState {
424    pending_scheduled_probes: AtomicUsize,
425    pending_probe_timeouts: AtomicUsize,
426}
427
428#[cfg(test)]
429impl RunnerState {
430    fn new() -> Self {
431        Self {
432            pending_scheduled_probes: AtomicUsize::new(0),
433            pending_probe_timeouts: AtomicUsize::new(0),
434        }
435    }
436
437    fn pending_scheduled_probes(&self) -> usize {
438        self.pending_scheduled_probes.load(Relaxed)
439    }
440
441    fn pending_probe_timeouts(&self) -> usize {
442        self.pending_probe_timeouts.load(Relaxed)
443    }
444
445    fn increment_pending_scheduled_probes(&self) {
446        self.pending_scheduled_probes.fetch_add(1, Relaxed);
447    }
448
449    fn increment_pending_probe_timeouts(&self) {
450        self.pending_probe_timeouts.fetch_add(1, Relaxed);
451    }
452
453    fn decrement_pending_scheduled_probes(&self) {
454        self.pending_scheduled_probes.fetch_sub(1, Relaxed);
455    }
456
457    fn decrement_pending_probe_timeouts(&self) {
458        self.pending_probe_timeouts.fetch_sub(1, Relaxed);
459    }
460}
461
462pub(super) struct Runner {
463    registry: Arc<Mutex<RegistryState>>,
464    pending_probes: DelayQueue<usize>,
465    pending_timeouts: DelayQueue<usize>,
466    guard: RunnerGuard,
467    pending_components_notify: Arc<Notify>,
468    #[cfg(test)]
469    state: Arc<RunnerState>,
470}
471
472impl Runner {
473    fn new(
474        registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
475        pending_components_notify: Arc<Notify>,
476    ) -> Self {
477        #[cfg(test)]
478        let state = Arc::new(RunnerState::new());
479
480        let guard = RunnerGuard {
481            registry: Arc::clone(&registry),
482            responses_rx: Some(responses_rx),
483        };
484
485        Self {
486            registry,
487            pending_probes: DelayQueue::new(),
488            pending_timeouts: DelayQueue::new(),
489            guard,
490            pending_components_notify,
491            #[cfg(test)]
492            state,
493        }
494    }
495
496    #[cfg(test)]
497    fn state(&self) -> Arc<RunnerState> {
498        Arc::clone(&self.state)
499    }
500
501    fn drain_pending_components(&mut self) -> Vec<usize> {
502        // Drain all pending components.
503        let mut registry = self.registry.lock().unwrap();
504        registry.pending_components.drain(..).collect()
505    }
506
507    fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
508        let mut registry = self.registry.lock().unwrap();
509        let component_state = &mut registry.component_state[component_id];
510
511        // Check if our component is already dead, in which case we don't need to send a liveness probe.
512        if component_state.request_tx.is_closed() {
513            debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
514            return Some(HealthUpdate::Dead);
515        }
516
517        trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
518
519        // Our component _isn't_ dead, so try to send a liveness probe to it.
520        //
521        // We'll register an entry in `pending_timeouts` that automatically marks the component as not live if we don't
522        // receive a response to the liveness probe within the timeout duration.
523        let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
524
525        #[cfg(test)]
526        self.state.increment_pending_probe_timeouts();
527
528        let request = LivenessRequest::new(component_id, timeout_key);
529        if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
530            debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
531
532            // We failed to send the probe to the component due to the component being dead. We'll drop our pending
533            // timeout as we're going to mark this component dead right now.
534            //
535            // When our send fails due to the channel being full, that's OK: it means it's going to be handled by an
536            // existing timeout and will be probed again later.
537            self.pending_timeouts.remove(&request.timeout_key);
538
539            #[cfg(test)]
540            self.state.decrement_pending_probe_timeouts();
541
542            return Some(HealthUpdate::Dead);
543        }
544
545        None
546    }
547
548    fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
549        #[cfg(test)]
550        self.state.increment_pending_scheduled_probes();
551
552        self.pending_probes.insert(component_id, duration);
553    }
554
555    fn schedule_all_existing_components(&mut self, responses_rx: &mut mpsc::Receiver<LivenessResponse>) {
556        // First, drain any pending components to avoid scheduling them twice.
557        // This handles the case where components were registered before the runner started.
558        let _pending = self.drain_pending_components();
559
560        // Drain any queued probe responses from the previous runner. These responses were sent by
561        // components before the runner shut down but weren't processed. Processing them now updates
562        // `last_response` timestamps, which affects the staleness check below — a response that
563        // arrived just before shutdown should count as fresh.
564        while let Ok(response) = responses_rx.try_recv() {
565            self.handle_component_probe_response(response);
566        }
567
568        // Determine which components have stale probe results. Components whose last response is
569        // within the probe timeout are considered fresh and their health state is preserved,
570        // avoiding unnecessary bursts of failed liveness/readiness probes on runner restart.
571        let (component_count, stale_component_ids) = {
572            let registry = self.registry.lock().unwrap();
573            let now = Instant::now();
574            let stale_ids: Vec<usize> = (0..registry.component_state.len())
575                .filter(|&id| {
576                    now.duration_since(registry.component_state[id].last_response) >= DEFAULT_PROBE_TIMEOUT_DUR
577                })
578                .collect();
579            (registry.component_state.len(), stale_ids)
580        };
581
582        // Only reset health to Unknown for components with stale probe results.
583        for &component_id in &stale_component_ids {
584            self.process_component_health_update(component_id, HealthUpdate::Unknown);
585        }
586
587        // Schedule immediate probes for all components regardless of staleness.
588        for component_id in 0..component_count {
589            self.schedule_probe_for_component(component_id, Duration::ZERO);
590        }
591
592        if component_count > 0 {
593            let fresh_count = component_count - stale_component_ids.len();
594            debug!(
595                component_count,
596                fresh_count,
597                stale_count = stale_component_ids.len(),
598                "Scheduled probes for all existing components."
599            );
600        }
601    }
602
603    fn handle_component_probe_response(&mut self, response: LivenessResponse) {
604        let component_id = response.request.component_id;
605        let timeout_key = response.request.timeout_key;
606        let request_sent = response.request.request_sent;
607        let response_sent = response.response_sent;
608        let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
609
610        // Clear any pending timeouts for this component and schedule the next probe.
611        let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
612        if !timeout_was_pending {
613            let mut registry = self.registry.lock().unwrap();
614            let component_state = &mut registry.component_state[component_id];
615
616            debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
617        }
618
619        // Update the component's health to show as alive.
620        let update = HealthUpdate::Alive {
621            last_response: response_sent,
622            last_response_latency: response_latency,
623        };
624        self.process_component_health_update(component_id, update);
625
626        // Only schedule the next probe if we successfully removed the timeout, meaning it hadn't fired yet.
627        // This prevents duplicate probe scheduling when a response arrives after a timeout.
628        if timeout_was_pending {
629            #[cfg(test)]
630            self.state.decrement_pending_probe_timeouts();
631
632            self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
633        }
634    }
635
636    fn handle_component_timeout(&mut self, component_id: usize) {
637        // Update the component's health to show as not alive.
638        self.process_component_health_update(component_id, HealthUpdate::Unknown);
639
640        // Schedule the next probe for this component.
641        self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
642    }
643
644    fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
645        // Update the component's health state based on the given update.
646        let mut registry = self.registry.lock().unwrap();
647        let component_state = &mut registry.component_state[component_id];
648        trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
649
650        match update {
651            HealthUpdate::Alive {
652                last_response,
653                last_response_latency,
654            } => component_state.mark_live(last_response, last_response_latency),
655            HealthUpdate::Unknown => component_state.mark_not_live(),
656            HealthUpdate::Dead => component_state.mark_dead(),
657        }
658    }
659
660    async fn run<F: Future<Output = ()>>(mut self, shutdown: F) {
661        info!("Health checker running.");
662
663        // Take the response receiver out of the guard so we can use it in the select loop.
664        // It will be put back when the guard is dropped.
665        let mut responses_rx = self
666            .guard
667            .responses_rx
668            .take()
669            .expect("responses_rx should always be Some when Runner is created");
670
671        // Schedule probes for all existing components. This allows the runner to "pick up where it
672        // left off" after a restart - any components that were registered before the runner was
673        // restarted will be immediately probed.
674        self.schedule_all_existing_components(&mut responses_rx);
675
676        // Pin the shutdown future so we can poll it in the select loop.
677        let mut shutdown = std::pin::pin!(shutdown);
678
679        loop {
680            select! {
681                // Shutdown signal received - exit the run loop gracefully.
682                _ = &mut shutdown => {
683                    info!("Health checker shutting down.");
684                    break;
685                },
686
687                // A component has been scheduled to have a liveness probe sent to it.
688                Some(entry) = self.pending_probes.next() => {
689                    #[cfg(test)]
690                    self.state.decrement_pending_scheduled_probes();
691
692                    let component_id = entry.into_inner();
693                    if let Some(health_update) = self.send_component_probe_request(component_id) {
694                        // If we got a health update for this component, that means we detected that it's dead, so we need
695                        // to do an out-of-band update to its health.
696                        self.process_component_health_update(component_id, health_update);
697                    }
698                },
699
700                // A component's outstanding liveness probe has expired.
701                Some(entry) = self.pending_timeouts.next() => {
702                    #[cfg(test)]
703                    self.state.decrement_pending_probe_timeouts();
704
705                    let component_id = entry.into_inner();
706                    self.handle_component_timeout(component_id);
707                },
708
709                // A probe response has been received.
710                Some(response) = responses_rx.recv() => {
711                    self.handle_component_probe_response(response);
712                },
713
714                // A component is pending finalization of their registration.
715                _ = self.pending_components_notify.notified() => {
716                    // Drain all pending components, give them a clean initial state of "unknown", and immediately schedule a probe for them.
717                    let pending_component_ids = self.drain_pending_components();
718                    for pending_component_id in pending_component_ids {
719                        self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
720                        self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
721                    }
722                },
723            }
724        }
725
726        // Put the receiver back in the guard so it can be returned to the registry state when dropped.
727        self.guard.responses_rx = Some(responses_rx);
728
729        // When we exit the loop, the RunnerGuard will be dropped, returning the response receiver
730        // back to the registry state so that a subsequent spawn() can succeed.
731    }
732}
733
734#[cfg(test)]
735mod tests {
736    use std::future::Future;
737
738    use futures::FutureExt as _;
739    use tokio::sync::oneshot;
740    use tokio_test::{
741        assert_pending, assert_ready,
742        task::{spawn, Spawn},
743    };
744
745    use super::*;
746
747    const COMPONENT_ID: &str = "test_component";
748
749    #[track_caller]
750    fn initialize_registry_with_component(
751        component_id: &str,
752    ) -> (
753        Health,
754        Spawn<impl Future<Output = ()>>,
755        Arc<Mutex<RegistryState>>,
756        Arc<RunnerState>,
757    ) {
758        let registry = HealthRegistry::new();
759        let registry_state = registry.state();
760
761        // Add our component to the registry:
762        let handle = registry.register_component(component_id).unwrap();
763
764        // Extract the registry runner task and poll it until it's quiesced.
765        //
766        // This ensures that the component is registered, and that it schedules/sends an initial probe request to the component:
767        let runner = registry.into_runner().expect("should not fail to create runner");
768        let runner_state = runner.state();
769
770        // Create a shutdown future that never resolves (for tests that don't need shutdown).
771        let shutdown = std::future::pending();
772        let registry_task = spawn(runner.run(shutdown));
773
774        (handle, registry_task, registry_state, runner_state)
775    }
776
777    #[track_caller]
778    fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
779        assert_pending!(task.poll());
780        while task.is_woken() {
781            assert_pending!(task.poll());
782        }
783    }
784
785    fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
786        let state = state.lock().unwrap();
787        state
788            .component_state
789            .iter()
790            .find(|state| state.name == component_id)
791            .map(|state| state.is_live())
792            .unwrap()
793    }
794
795    #[test]
796    fn basic_registration() {
797        let registry = HealthRegistry::new();
798        assert!(registry.register_component(COMPONENT_ID).is_some());
799    }
800
801    #[test]
802    fn duplicate_component_registration_fails() {
803        let registry = HealthRegistry::new();
804
805        // Registering the same component twice should fail:
806        assert!(registry.register_component(COMPONENT_ID).is_some());
807        assert!(registry.register_component(COMPONENT_ID).is_none());
808    }
809
810    #[test]
811    fn duplicate_runner_creation_fails_while_running() {
812        let registry = HealthRegistry::new();
813        let registry2 = registry.clone();
814
815        // First runner creation should succeed. We hold on to it so the RunnerGuard doesn't
816        // return the receiver back to the registry state.
817        let _runner = registry.into_runner().expect("first runner creation should succeed");
818
819        // Second creation should fail while the first runner still holds the receiver.
820        assert!(registry2.into_runner().is_err());
821    }
822
823    #[tokio::test]
824    async fn registry_can_be_respawned_after_shutdown() {
825        let registry = HealthRegistry::new();
826        let registry2 = registry.clone();
827        let registry3 = registry.clone();
828
829        // First runner creation should succeed.
830        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
831        let runner = registry.into_runner().expect("first runner creation should succeed");
832
833        // Run the runner on a spawned task so we can trigger shutdown.
834        let join_handle = tokio::spawn(runner.run(shutdown_rx.map(|_| ())));
835
836        // Trigger shutdown.
837        let _ = shutdown_tx.send(());
838
839        // Wait for the runner to stop.
840        join_handle.await.expect("runner should complete without panic");
841
842        // Now we should be able to create a runner again (the RunnerGuard returned the receiver).
843        let _runner2 = registry2
844            .into_runner()
845            .expect("should be able to create runner after shutdown");
846
847        // But not a third time while the second runner holds the receiver.
848        assert!(
849            registry3.into_runner().is_err(),
850            "should not be able to create runner while one exists"
851        );
852    }
853
854    #[test]
855    fn readiness() {
856        let registry = HealthRegistry::new();
857
858        // An empty registry is always ready, so `all_ready` resolves immediately:
859        let mut all_ready_fut = spawn(registry.all_ready());
860        assert_ready!(all_ready_fut.poll());
861
862        // Components start out as not ready, so adding this component changes the registry to not ready overall:
863        let mut handle = registry.register_component(COMPONENT_ID).unwrap();
864
865        let mut all_ready_fut = spawn(registry.all_ready());
866        assert_pending!(all_ready_fut.poll());
867
868        // Now mark the component as ready. `all_ready` should resolve on the next poll:
869        handle.mark_ready();
870
871        assert!(all_ready_fut.is_woken());
872        assert_ready!(all_ready_fut.poll());
873
874        // Ensure a fresh `all_ready` call immediately observes all components being ready:
875        let mut all_ready_fut = spawn(registry.all_ready());
876        assert_ready!(all_ready_fut.poll());
877
878        // Finally, make sure that the readiness state isn't latched, as `all_ready` should always reflect the current state:
879        handle.mark_not_ready();
880
881        let mut all_ready_fut = spawn(registry.all_ready());
882        assert_pending!(all_ready_fut.poll());
883    }
884
885    #[tokio::test(start_paused = true)]
886    async fn component_responds_before_timeout() {
887        // Create our registry with a registered component:
888        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
889
890        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
891        // which means the component hasn't been registered yet and no probe request has been sent:
892        let mut live_future = spawn(handle.live());
893        assert_pending!(live_future.poll());
894        assert_eq!(runner_state.pending_probe_timeouts(), 0);
895        assert_eq!(runner_state.pending_scheduled_probes(), 0);
896
897        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
898        drive_until_quiesced(&mut registry);
899        assert_eq!(runner_state.pending_probe_timeouts(), 1);
900        assert_eq!(runner_state.pending_scheduled_probes(), 0);
901
902        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
903        assert!(!component_live(&registry_state, COMPONENT_ID));
904
905        // After polling the registry task, we should have sent a probe request which will have now woken up our `live` future.
906        //
907        // Poll the future which should then respond to the probe request:
908        assert!(live_future.is_woken());
909        assert_ready!(live_future.poll());
910
911        // The registry task should have been woken by the probe response.
912        //
913        // Drive the registry task until it is quiesced and ensure that the component is now live:
914        assert!(registry.is_woken());
915        drive_until_quiesced(&mut registry);
916
917        assert!(component_live(&registry_state, COMPONENT_ID));
918
919        // Since the probe response was received, we should have a pending schedule probe now since this is a "normal" probe now,
920        // and isn't the initial probe request which is scheduled immediately:
921        assert_eq!(runner_state.pending_probe_timeouts(), 0);
922        assert_eq!(runner_state.pending_scheduled_probes(), 1);
923    }
924
925    #[tokio::test(start_paused = true)]
926    async fn component_responds_after_timeout() {
927        // Create our registry with a registered component:
928        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
929
930        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
931        // which means the component hasn't been registered yet and no probe request has been sent:
932        let mut live_future = spawn(handle.live());
933        assert_pending!(live_future.poll());
934        assert_eq!(runner_state.pending_probe_timeouts(), 0);
935        assert_eq!(runner_state.pending_scheduled_probes(), 0);
936
937        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
938        drive_until_quiesced(&mut registry);
939        assert_eq!(runner_state.pending_probe_timeouts(), 1);
940        assert_eq!(runner_state.pending_scheduled_probes(), 0);
941
942        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
943        assert!(!component_live(&registry_state, COMPONENT_ID));
944
945        // After polling the registry task, we should have sent a probe request which will have now woken up
946        // our `live` future, but we won't yet poll it. In fact, we'll advance time _past_ the probe timeout to simulate
947        // the probe timeout expiring:
948        assert!(live_future.is_woken());
949        assert!(!registry.is_woken());
950
951        tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
952
953        // The registry task should have been woken by the probe timeout expiring.
954        //
955        // Drive the registry task until it is quiesced and ensure that the component is still not live:
956        assert!(registry.is_woken());
957        drive_until_quiesced(&mut registry);
958
959        assert!(!component_live(&registry_state, COMPONENT_ID));
960
961        // Since the probe response was not received, we should have a pending schedule probe now since this is a "normal" probe now,
962        // and isn't the initial probe request which is scheduled immediately:
963        assert_eq!(runner_state.pending_probe_timeouts(), 0);
964        assert_eq!(runner_state.pending_scheduled_probes(), 1);
965
966        // Now, we'll actually drive the `live` future to respond to the probe request, which should mark the component as live:
967        assert_ready!(live_future.poll());
968
969        assert!(registry.is_woken());
970        drive_until_quiesced(&mut registry);
971
972        assert!(component_live(&registry_state, COMPONENT_ID));
973
974        // However, since the first probe response timed out, and we haven't yet fired off our scheduled probe, receiving this late
975        // response should not trigger the scheduling of another probe:
976        assert_eq!(runner_state.pending_probe_timeouts(), 0);
977        assert_eq!(runner_state.pending_scheduled_probes(), 1);
978    }
979
980    #[track_caller]
981    #[allow(clippy::type_complexity)]
982    fn initialize_registry_with_component_and_shutdown(
983        component_id: &str,
984    ) -> (
985        Health,
986        Spawn<impl Future<Output = ()>>,
987        Arc<Mutex<RegistryState>>,
988        Arc<RunnerState>,
989        oneshot::Sender<()>,
990    ) {
991        let registry = HealthRegistry::new();
992        let registry_state = registry.state();
993        let handle = registry.register_component(component_id).unwrap();
994        let runner = registry.into_runner().expect("should not fail to create runner");
995        let runner_state = runner.state();
996
997        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
998        let registry_task = spawn(runner.run(shutdown_rx.map(|_| ())));
999
1000        (handle, registry_task, registry_state, runner_state, shutdown_tx)
1001    }
1002
1003    #[tokio::test(start_paused = true)]
1004    async fn respawn_preserves_fresh_component_health() {
1005        // Create our registry with a registered component and drive the runner until the initial probe is sent:
1006        let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1007            initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1008        drive_until_quiesced(&mut registry);
1009
1010        // Respond to the probe request so the component becomes live:
1011        let mut live_future = spawn(handle.live());
1012        assert_ready!(live_future.poll());
1013        drive_until_quiesced(&mut registry);
1014        assert!(component_live(&registry_state, COMPONENT_ID));
1015
1016        // Shut down the runner gracefully, which returns the response receiver to the registry state:
1017        let _ = shutdown_tx.send(());
1018        assert_ready!(registry.poll());
1019
1020        // Respawn the runner immediately (no time advance), so the probe result is still fresh:
1021        let registry = HealthRegistry {
1022            inner: Arc::clone(&registry_state),
1023        };
1024        let runner = registry
1025            .into_runner()
1026            .expect("should be able to respawn after shutdown");
1027        let _runner_state = runner.state();
1028        let mut registry = spawn(runner.run(std::future::pending()));
1029        drive_until_quiesced(&mut registry);
1030
1031        // The component's health should be preserved as Live since its last response is fresh:
1032        assert!(component_live(&registry_state, COMPONENT_ID));
1033    }
1034
1035    #[tokio::test(start_paused = true)]
1036    async fn respawn_resets_stale_component_health() {
1037        // Create our registry with a registered component and drive the runner until the initial probe is sent:
1038        let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1039            initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1040        drive_until_quiesced(&mut registry);
1041
1042        // Respond to the probe request so the component becomes live:
1043        let mut live_future = spawn(handle.live());
1044        assert_ready!(live_future.poll());
1045        drive_until_quiesced(&mut registry);
1046        assert!(component_live(&registry_state, COMPONENT_ID));
1047
1048        // Shut down the runner gracefully, which returns the response receiver to the registry state:
1049        let _ = shutdown_tx.send(());
1050        assert_ready!(registry.poll());
1051
1052        // Advance time past the probe timeout so the last response becomes stale:
1053        tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
1054
1055        // Respawn the runner. The stale component should be reset to Unknown:
1056        let registry = HealthRegistry {
1057            inner: Arc::clone(&registry_state),
1058        };
1059        let runner = registry
1060            .into_runner()
1061            .expect("should be able to respawn after shutdown");
1062        let _runner_state = runner.state();
1063        let mut registry = spawn(runner.run(std::future::pending()));
1064        drive_until_quiesced(&mut registry);
1065
1066        // The component's health should have been reset to Unknown since its last response is stale:
1067        assert!(!component_live(&registry_state, COMPONENT_ID));
1068    }
1069}