saluki_health/
lib.rs

1#[cfg(test)]
2use std::sync::atomic::AtomicUsize;
3use std::{
4    collections::HashSet,
5    sync::{
6        atomic::{AtomicBool, Ordering::Relaxed},
7        Arc, Mutex,
8    },
9    time::{Duration, Instant},
10};
11
12use futures::StreamExt as _;
13use saluki_error::{generic_error, GenericError};
14use saluki_metrics::static_metrics;
15use stringtheory::MetaString;
16use tokio::{
17    select,
18    sync::{
19        mpsc::{self, error::TrySendError},
20        Notify,
21    },
22    task::JoinHandle,
23};
24use tokio_util::time::{delay_queue::Key, DelayQueue};
25use tracing::{debug, info, trace};
26
27mod api;
28pub use self::api::HealthAPIHandler;
29
30const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
31const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
32
33/// A handle for updating the health of a component.
34pub struct Health {
35    shared: Arc<SharedComponentState>,
36    request_rx: mpsc::Receiver<LivenessRequest>,
37    response_tx: mpsc::Sender<LivenessResponse>,
38}
39
40impl Health {
41    /// Marks the component as ready.
42    pub fn mark_ready(&mut self) {
43        self.update_readiness(true);
44    }
45
46    /// Marks the component as not ready.
47    pub fn mark_not_ready(&mut self) {
48        self.update_readiness(false);
49    }
50
51    fn update_readiness(&self, ready: bool) {
52        self.shared.ready.store(ready, Relaxed);
53        self.shared.telemetry.update_readiness(ready);
54    }
55
56    /// Waits for a liveness probe to be sent to the component, and then responds to it.
57    ///
58    /// This should generally be polled as part of a `select!` block to ensure it is checked alongside other
59    /// asynchronous operations.
60    pub async fn live(&mut self) {
61        // Simply wait for the health registry to send us a liveness probe, and if we receive one, we respond back to it
62        // immediately.
63        if let Some(request) = self.request_rx.recv().await {
64            let response = request.into_response();
65            let _ = self.response_tx.send(response).await;
66        }
67    }
68}
69
70#[derive(Clone, Copy, Eq, PartialEq)]
71enum HealthState {
72    Live,
73    Unknown,
74    Dead,
75}
76
77static_metrics!(
78    name => Telemetry,
79    prefix => health,
80    labels => [component_id: Arc<str>],
81    metrics => [
82        gauge(component_ready),
83        gauge(component_live),
84        trace_histogram(component_liveness_latency_seconds),
85    ]
86);
87
88impl Telemetry {
89    fn from_name(name: &str) -> Self {
90        Self::new(Arc::from(name))
91    }
92
93    fn update_readiness(&self, ready: bool) {
94        self.component_ready().set(if ready { 1.0 } else { 0.0 });
95    }
96
97    fn update_liveness(&self, state: HealthState, response_latency: Duration) {
98        let live = match state {
99            HealthState::Live => 1.0,
100            HealthState::Unknown => 0.0,
101            HealthState::Dead => -1.0,
102        };
103
104        self.component_live().set(live);
105        self.component_liveness_latency_seconds()
106            .record(response_latency.as_secs_f64());
107    }
108}
109
110struct SharedComponentState {
111    ready: AtomicBool,
112    telemetry: Telemetry,
113}
114
115struct ComponentState {
116    name: MetaString,
117    health: HealthState,
118    shared: Arc<SharedComponentState>,
119    request_tx: mpsc::Sender<LivenessRequest>,
120    last_response: Instant,
121    last_response_latency: Duration,
122}
123
124impl ComponentState {
125    fn new(name: MetaString, response_tx: mpsc::Sender<LivenessResponse>) -> (Self, Health) {
126        let shared = Arc::new(SharedComponentState {
127            ready: AtomicBool::new(false),
128            telemetry: Telemetry::from_name(&name),
129        });
130        let (request_tx, request_rx) = mpsc::channel(1);
131
132        let state = Self {
133            name,
134            health: HealthState::Unknown,
135            shared: Arc::clone(&shared),
136            request_tx,
137            last_response: Instant::now(),
138            last_response_latency: Duration::from_secs(0),
139        };
140
141        let handle = Health {
142            shared,
143            request_rx,
144            response_tx,
145        };
146
147        (state, handle)
148    }
149
150    fn is_ready(&self) -> bool {
151        // We consider a component ready if it's marked as ready (duh) and it's not dead.
152        //
153        // Being "dead" is a special case as it means the component is very likely not even running at all, not just
154        // responding slowly or deadlocked. In these cases, it can't possibly be ready since it's not even running.
155        self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
156    }
157
158    fn is_live(&self) -> bool {
159        self.health == HealthState::Live
160    }
161
162    fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
163        self.health = HealthState::Live;
164        self.last_response = response_sent;
165        self.last_response_latency = response_latency;
166        self.shared.telemetry.update_liveness(self.health, response_latency);
167    }
168
169    fn mark_not_live(&mut self) {
170        self.health = HealthState::Unknown;
171
172        // We use the default timeout as the latency for when the component is not considered alive.
173        self.shared
174            .telemetry
175            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
176    }
177
178    fn mark_dead(&mut self) {
179        self.health = HealthState::Dead;
180
181        // We use the default timeout as the latency for when the component is not considered alive.
182        self.shared
183            .telemetry
184            .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
185    }
186}
187
188struct LivenessRequest {
189    component_id: usize,
190    timeout_key: Key,
191    request_sent: Instant,
192}
193
194impl LivenessRequest {
195    fn new(component_id: usize, timeout_key: Key) -> Self {
196        Self {
197            component_id,
198            timeout_key,
199            request_sent: Instant::now(),
200        }
201    }
202
203    fn into_response(self) -> LivenessResponse {
204        LivenessResponse {
205            request: self,
206            response_sent: Instant::now(),
207        }
208    }
209}
210
211struct LivenessResponse {
212    request: LivenessRequest,
213    response_sent: Instant,
214}
215
216enum HealthUpdate {
217    Alive {
218        last_response: Instant,
219        last_response_latency: Duration,
220    },
221    Unknown,
222    Dead,
223}
224
225impl HealthUpdate {
226    fn as_str(&self) -> &'static str {
227        match self {
228            HealthUpdate::Alive { .. } => "alive",
229            HealthUpdate::Unknown => "unknown",
230            HealthUpdate::Dead => "dead",
231        }
232    }
233}
234
235struct RegistryState {
236    registered_components: HashSet<MetaString>,
237    component_state: Vec<ComponentState>,
238    responses_tx: mpsc::Sender<LivenessResponse>,
239    responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
240    pending_components: Vec<usize>,
241    pending_components_notify: Arc<Notify>,
242}
243
244impl RegistryState {
245    fn new() -> Self {
246        let (responses_tx, responses_rx) = mpsc::channel(16);
247
248        Self {
249            registered_components: HashSet::new(),
250            component_state: Vec::new(),
251            responses_tx,
252            responses_rx: Some(responses_rx),
253            pending_components: Vec::new(),
254            pending_components_notify: Arc::new(Notify::new()),
255        }
256    }
257}
258
259/// A registry of components and their health.
260///
261/// `HealthRegistry` is responsible for tracking the health of all registered components, by storing both their
262/// readiness, which indicates whether or not they are initialized and generally ready to process data, as well as
263/// probing their liveness, which indicates if they're currently responding, or able to respond, to requests.
264///
265/// # Telemetry
266///
267/// The health registry emits some internal telemetry about the status of registered components. In particular, three
268/// metrics are emitted:
269///
270/// - `health.component_ready`: whether or not a component is ready (`gauge`, `0` for not ready, `1` for ready)
271/// - `health.component_alive`: whether or not a component is alive (`gauge`, `0` for not alive/unknown, `1` for alive, `-1` for dead)
272/// - `health.component_liveness_latency_secs`: the response latency of the component for liveness probes (`histogram`,
273///   in seconds)
274///
275/// All metrics have a `component_id` tag that corresponds to the name of the component that was given when registering it.
276#[derive(Clone)]
277pub struct HealthRegistry {
278    inner: Arc<Mutex<RegistryState>>,
279}
280
281impl HealthRegistry {
282    /// Creates an empty registry.
283    pub fn new() -> Self {
284        Self {
285            inner: Arc::new(Mutex::new(RegistryState::new())),
286        }
287    }
288
289    #[cfg(test)]
290    fn state(&self) -> Arc<Mutex<RegistryState>> {
291        Arc::clone(&self.inner)
292    }
293
294    /// Registers a component with the registry.
295    ///
296    /// A handle is returned that must be used by the component to set its readiness as well as respond to liveness
297    /// probes. See [`Health::mark_ready`], [`Health::mark_not_ready`], and [`Health::live`] for more information.
298    pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
299        let mut inner = self.inner.lock().unwrap();
300
301        // Make sure we don't already have this component registered.
302        let name = name.into();
303        if !inner.registered_components.insert(name.clone()) {
304            return None;
305        }
306
307        // Add the component state.
308        let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone());
309        let component_id = inner.component_state.len();
310        inner.component_state.push(state);
311
312        debug!(component_id, "Registered component '{}'.", name);
313
314        // Mark ourselves as having a pending component that needs to be scheduled.
315        inner.pending_components.push(component_id);
316        inner.pending_components_notify.notify_one();
317
318        Some(handle)
319    }
320
321    /// Gets an API handler for reporting the health of all components.
322    ///
323    /// This handler exposes routes for querying the readiness and liveness of all registered components. See
324    /// [`HealthAPIHandler`] for more information about routes and responses.
325    pub fn api_handler(&self) -> HealthAPIHandler {
326        HealthAPIHandler::from_state(Arc::clone(&self.inner))
327    }
328
329    /// Returns `true` if all components are ready.
330    pub fn all_ready(&self) -> bool {
331        let inner = self.inner.lock().unwrap();
332
333        for component in &inner.component_state {
334            if !component.is_ready() {
335                return false;
336            }
337        }
338
339        true
340    }
341
342    fn into_runner(self) -> Result<Runner, GenericError> {
343        // Make sure the runner hasn't already been spawned.
344        let (responses_rx, pending_components_notify) = {
345            let mut inner = self.inner.lock().unwrap();
346            let responses_rx = match inner.responses_rx.take() {
347                Some(rx) => rx,
348                None => return Err(generic_error!("health registry already spawned")),
349            };
350
351            let pending_components_notify = Arc::clone(&inner.pending_components_notify);
352            (responses_rx, pending_components_notify)
353        };
354
355        Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
356    }
357
358    /// Spawns the health registry runner, which manages the scheduling and collection of liveness probes.
359    ///
360    /// # Errors
361    ///
362    /// If the health registry has already been spawned, an error will be returned.
363    pub async fn spawn(self) -> Result<JoinHandle<()>, GenericError> {
364        let runner = self.into_runner()?;
365        Ok(tokio::spawn(runner.run()))
366    }
367}
368
369#[cfg(test)]
370struct RunnerState {
371    pending_scheduled_probes: AtomicUsize,
372    pending_probe_timeouts: AtomicUsize,
373}
374
375#[cfg(test)]
376impl RunnerState {
377    fn new() -> Self {
378        Self {
379            pending_scheduled_probes: AtomicUsize::new(0),
380            pending_probe_timeouts: AtomicUsize::new(0),
381        }
382    }
383
384    fn pending_scheduled_probes(&self) -> usize {
385        self.pending_scheduled_probes.load(Relaxed)
386    }
387
388    fn pending_probe_timeouts(&self) -> usize {
389        self.pending_probe_timeouts.load(Relaxed)
390    }
391
392    fn increment_pending_scheduled_probes(&self) {
393        self.pending_scheduled_probes.fetch_add(1, Relaxed);
394    }
395
396    fn increment_pending_probe_timeouts(&self) {
397        self.pending_probe_timeouts.fetch_add(1, Relaxed);
398    }
399
400    fn decrement_pending_scheduled_probes(&self) {
401        self.pending_scheduled_probes.fetch_sub(1, Relaxed);
402    }
403
404    fn decrement_pending_probe_timeouts(&self) {
405        self.pending_probe_timeouts.fetch_sub(1, Relaxed);
406    }
407}
408
409struct Runner {
410    registry: Arc<Mutex<RegistryState>>,
411    pending_probes: DelayQueue<usize>,
412    pending_timeouts: DelayQueue<usize>,
413    responses_rx: mpsc::Receiver<LivenessResponse>,
414    pending_components_notify: Arc<Notify>,
415    #[cfg(test)]
416    state: Arc<RunnerState>,
417}
418
419impl Runner {
420    fn new(
421        registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
422        pending_components_notify: Arc<Notify>,
423    ) -> Self {
424        #[cfg(test)]
425        let state = Arc::new(RunnerState::new());
426
427        Self {
428            registry,
429            pending_probes: DelayQueue::new(),
430            pending_timeouts: DelayQueue::new(),
431            responses_rx,
432            pending_components_notify,
433            #[cfg(test)]
434            state,
435        }
436    }
437
438    #[cfg(test)]
439    fn state(&self) -> Arc<RunnerState> {
440        Arc::clone(&self.state)
441    }
442
443    fn drain_pending_components(&mut self) -> Vec<usize> {
444        // Drain all pending components.
445        let mut registry = self.registry.lock().unwrap();
446        registry.pending_components.drain(..).collect()
447    }
448
449    fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
450        let mut registry = self.registry.lock().unwrap();
451        let component_state = &mut registry.component_state[component_id];
452
453        // Check if our component is already dead, in which case we don't need to send a liveness probe.
454        if component_state.request_tx.is_closed() {
455            debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
456            return Some(HealthUpdate::Dead);
457        }
458
459        trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
460
461        // Our component _isn't_ dead, so try to send a liveness probe to it.
462        //
463        // We'll register an entry in `pending_timeouts` that automatically marks the component as not live if we don't
464        // receive a response to the liveness probe within the timeout duration.
465        let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
466
467        #[cfg(test)]
468        self.state.increment_pending_probe_timeouts();
469
470        let request = LivenessRequest::new(component_id, timeout_key);
471        if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
472            debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
473
474            // We failed to send the probe to the component due to the component being dead. We'll drop our pending
475            // timeout as we're going to mark this component dead right now.
476            //
477            // When our send fails due to the channel being full, that's OK: it means it's going to be handled by an
478            // existing timeout and will be probed again later.
479            self.pending_timeouts.remove(&request.timeout_key);
480
481            #[cfg(test)]
482            self.state.decrement_pending_probe_timeouts();
483
484            return Some(HealthUpdate::Dead);
485        }
486
487        None
488    }
489
490    fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
491        #[cfg(test)]
492        self.state.increment_pending_scheduled_probes();
493
494        self.pending_probes.insert(component_id, duration);
495    }
496
497    fn handle_component_probe_response(&mut self, response: LivenessResponse) {
498        let component_id = response.request.component_id;
499        let timeout_key = response.request.timeout_key;
500        let request_sent = response.request.request_sent;
501        let response_sent = response.response_sent;
502        let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
503
504        // Clear any pending timeouts for this component and schedule the next probe.
505        let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
506        if !timeout_was_pending {
507            let mut registry = self.registry.lock().unwrap();
508            let component_state = &mut registry.component_state[component_id];
509
510            debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
511        }
512
513        // Update the component's health to show as alive.
514        let update = HealthUpdate::Alive {
515            last_response: response_sent,
516            last_response_latency: response_latency,
517        };
518        self.process_component_health_update(component_id, update);
519
520        // Only schedule the next probe if we successfully removed the timeout, meaning it hadn't fired yet.
521        // This prevents duplicate probe scheduling when a response arrives after a timeout.
522        if timeout_was_pending {
523            #[cfg(test)]
524            self.state.decrement_pending_probe_timeouts();
525
526            self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
527        }
528    }
529
530    fn handle_component_timeout(&mut self, component_id: usize) {
531        // Update the component's health to show as not alive.
532        self.process_component_health_update(component_id, HealthUpdate::Unknown);
533
534        // Schedule the next probe for this component.
535        self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
536    }
537
538    fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
539        // Update the component's health state based on the given update.
540        let mut registry = self.registry.lock().unwrap();
541        let component_state = &mut registry.component_state[component_id];
542        trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
543
544        match update {
545            HealthUpdate::Alive {
546                last_response,
547                last_response_latency,
548            } => component_state.mark_live(last_response, last_response_latency),
549            HealthUpdate::Unknown => component_state.mark_not_live(),
550            HealthUpdate::Dead => component_state.mark_dead(),
551        }
552    }
553
554    async fn run(mut self) {
555        info!("Health checker running.");
556
557        loop {
558            select! {
559                // A component has been scheduled to have a liveness probe sent to it.
560                Some(entry) = self.pending_probes.next() => {
561                    #[cfg(test)]
562                    self.state.decrement_pending_scheduled_probes();
563
564                    let component_id = entry.into_inner();
565                    if let Some(health_update) = self.send_component_probe_request(component_id) {
566                        // If we got a health update for this component, that means we detected that it's dead, so we need
567                        // to do an out-of-band update to its health.
568                        self.process_component_health_update(component_id, health_update);
569                    }
570                },
571
572                // A component's outstanding liveness probe has expired.
573                Some(entry) = self.pending_timeouts.next() => {
574                    #[cfg(test)]
575                    self.state.decrement_pending_probe_timeouts();
576
577                    let component_id = entry.into_inner();
578                    self.handle_component_timeout(component_id);
579                },
580
581                // A probe response has been received.
582                Some(response) = self.responses_rx.recv() => {
583                    self.handle_component_probe_response(response);
584                },
585
586                // A component is pending finalization of their registration.
587                _ = self.pending_components_notify.notified() => {
588                    // Drain all pending components, give them a clean initial state of "unknown", and immediately schedule a probe for them.
589                    let pending_component_ids = self.drain_pending_components();
590                    for pending_component_id in pending_component_ids {
591                        self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
592                        self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
593                    }
594                },
595            }
596        }
597    }
598}
599
600#[cfg(test)]
601mod tests {
602    use std::future::Future;
603
604    use tokio_test::{
605        assert_pending, assert_ready,
606        task::{spawn, Spawn},
607    };
608
609    use super::*;
610
611    const COMPONENT_ID: &str = "test_component";
612
613    #[track_caller]
614    fn initialize_registry_with_component(
615        component_id: &str,
616    ) -> (
617        Health,
618        Spawn<impl Future<Output = ()>>,
619        Arc<Mutex<RegistryState>>,
620        Arc<RunnerState>,
621    ) {
622        let registry = HealthRegistry::new();
623        let registry_state = registry.state();
624
625        // Add our component to the registry:
626        let handle = registry.register_component(component_id).unwrap();
627
628        // Extract the registry runner task and poll it until it's quiesced.
629        //
630        // This ensures that the component is registered, and that it schedules/sends an initial probe request to the component:
631        let runner = registry.into_runner().expect("should not fail to create runner");
632        let runner_state = runner.state();
633        let registry_task = spawn(runner.run());
634
635        (handle, registry_task, registry_state, runner_state)
636    }
637
638    #[track_caller]
639    fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
640        assert_pending!(task.poll());
641        while task.is_woken() {
642            assert_pending!(task.poll());
643        }
644    }
645
646    fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
647        let state = state.lock().unwrap();
648        state
649            .component_state
650            .iter()
651            .find(|state| state.name == component_id)
652            .map(|state| state.is_live())
653            .unwrap()
654    }
655
656    #[test]
657    fn basic_registration() {
658        let registry = HealthRegistry::new();
659        assert!(registry.register_component(COMPONENT_ID).is_some());
660    }
661
662    #[test]
663    fn duplicate_component_registration_fails() {
664        let registry = HealthRegistry::new();
665
666        // Registering the same component twice should fail:
667        assert!(registry.register_component(COMPONENT_ID).is_some());
668        assert!(registry.register_component(COMPONENT_ID).is_none());
669    }
670
671    #[tokio::test]
672    async fn duplicate_registry_spawn_fails() {
673        let registry = HealthRegistry::new();
674        let registry2 = registry.clone();
675
676        assert!(registry.spawn().await.is_ok());
677        assert!(registry2.spawn().await.is_err());
678    }
679
680    #[test]
681    fn readiness() {
682        let registry = HealthRegistry::new();
683        assert!(registry.all_ready());
684
685        // Components should start out as not ready, so adding this component changes the registry to not ready overall:
686        let mut handle = registry.register_component(COMPONENT_ID).unwrap();
687        assert!(!registry.all_ready());
688
689        // Now mark the component as ready:
690        handle.mark_ready();
691        assert!(registry.all_ready());
692
693        // Now mark the component as not ready:
694        handle.mark_not_ready();
695        assert!(!registry.all_ready());
696    }
697
698    #[tokio::test(start_paused = true)]
699    async fn component_responds_before_timeout() {
700        // Create our registry with a registered component:
701        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
702
703        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
704        // which means the component hasn't been registered yet and no probe request has been sent:
705        let mut live_future = spawn(handle.live());
706        assert_pending!(live_future.poll());
707        assert_eq!(runner_state.pending_probe_timeouts(), 0);
708        assert_eq!(runner_state.pending_scheduled_probes(), 0);
709
710        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
711        drive_until_quiesced(&mut registry);
712        assert_eq!(runner_state.pending_probe_timeouts(), 1);
713        assert_eq!(runner_state.pending_scheduled_probes(), 0);
714
715        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
716        assert!(!component_live(&registry_state, COMPONENT_ID));
717
718        // After polling the registry task, we should have sent a probe request which will have now woken up our `live` future.
719        //
720        // Poll the future which should then respond to the probe request:
721        assert!(live_future.is_woken());
722        assert_ready!(live_future.poll());
723
724        // The registry task should have been woken by the probe response.
725        //
726        // Drive the registry task until it is quiesced and ensure that the component is now live:
727        assert!(registry.is_woken());
728        drive_until_quiesced(&mut registry);
729
730        assert!(component_live(&registry_state, COMPONENT_ID));
731
732        // Since the probe response was received, we should have a pending schedule probe now since this is a "normal" probe now,
733        // and isn't the initial probe request which is scheduled immediately:
734        assert_eq!(runner_state.pending_probe_timeouts(), 0);
735        assert_eq!(runner_state.pending_scheduled_probes(), 1);
736    }
737
738    #[tokio::test(start_paused = true)]
739    async fn component_responds_after_timeout() {
740        // Create our registry with a registered component:
741        let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
742
743        // Manually create our `live` call and ensure that it's not ready yet, as the registry task has not yet been driven,
744        // which means the component hasn't been registered yet and no probe request has been sent:
745        let mut live_future = spawn(handle.live());
746        assert_pending!(live_future.poll());
747        assert_eq!(runner_state.pending_probe_timeouts(), 0);
748        assert_eq!(runner_state.pending_scheduled_probes(), 0);
749
750        // Drive our registry task until it us quiesced to ensure the component is registered and that a probe request is sent:
751        drive_until_quiesced(&mut registry);
752        assert_eq!(runner_state.pending_probe_timeouts(), 1);
753        assert_eq!(runner_state.pending_scheduled_probes(), 0);
754
755        // Ensure our component is not live since, despite being registered, we haven't received a probe response for it yet:
756        assert!(!component_live(&registry_state, COMPONENT_ID));
757
758        // After polling the registry task, we should have sent a probe request which will have now woken up
759        // our `live` future, but we won't yet poll it. In fact, we'll advance time _past_ the probe timeout to simulate
760        // the probe timeout expiring:
761        assert!(live_future.is_woken());
762        assert!(!registry.is_woken());
763
764        tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
765
766        // The registry task should have been woken by the probe timeout expiring.
767        //
768        // Drive the registry task until it is quiesced and ensure that the component is still not live:
769        assert!(registry.is_woken());
770        drive_until_quiesced(&mut registry);
771
772        assert!(!component_live(&registry_state, COMPONENT_ID));
773
774        // Since the probe response was not received, we should have a pending schedule probe now since this is a "normal" probe now,
775        // and isn't the initial probe request which is scheduled immediately:
776        assert_eq!(runner_state.pending_probe_timeouts(), 0);
777        assert_eq!(runner_state.pending_scheduled_probes(), 1);
778
779        // Now, we'll actually drive the `live` future to respond to the probe request, which should mark the component as live:
780        assert_ready!(live_future.poll());
781
782        assert!(registry.is_woken());
783        drive_until_quiesced(&mut registry);
784
785        assert!(component_live(&registry_state, COMPONENT_ID));
786
787        // However, since the first probe response timed out, and we haven't yet fired off our scheduled probe, receiving this late
788        // response should not trigger the scheduling of another probe:
789        assert_eq!(runner_state.pending_probe_timeouts(), 0);
790        assert_eq!(runner_state.pending_scheduled_probes(), 1);
791    }
792}