1use std::future::Future;
4#[cfg(test)]
5use std::sync::atomic::AtomicUsize;
6use std::{
7 collections::HashSet,
8 sync::{
9 atomic::{AtomicBool, Ordering::Relaxed},
10 Arc, Mutex,
11 },
12 time::Duration,
13};
14
15use futures::StreamExt as _;
16use saluki_error::{generic_error, GenericError};
17use saluki_metrics::static_metrics;
18use stringtheory::MetaString;
19use tokio::time::Instant;
20use tokio::{
21 select,
22 sync::{
23 mpsc::{self, error::TrySendError},
24 Notify,
25 },
26};
27use tokio_util::time::{delay_queue::Key, DelayQueue};
28use tracing::{debug, info, trace};
29
30mod api;
31pub use self::api::HealthAPIHandler;
32
33mod worker;
34pub use self::worker::HealthRegistryWorker;
35
36const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
37const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
38
39pub struct Health {
41 shared: Arc<SharedComponentState>,
42 request_rx: mpsc::Receiver<LivenessRequest>,
43 response_tx: mpsc::Sender<LivenessResponse>,
44 readiness_notify: Arc<Notify>,
45}
46
47impl Health {
48 pub fn mark_ready(&mut self) {
50 self.update_readiness(true);
51 }
52
53 pub fn mark_not_ready(&mut self) {
55 self.update_readiness(false);
56 }
57
58 fn update_readiness(&self, ready: bool) {
59 self.shared.ready.store(ready, Relaxed);
60 self.shared.telemetry.update_readiness(ready);
61
62 if ready {
64 self.readiness_notify.notify_waiters();
65 }
66 }
67
68 pub async fn live(&mut self) {
73 if let Some(request) = self.request_rx.recv().await {
76 let response = request.into_response();
77 let _ = self.response_tx.send(response).await;
78 }
79 }
80}
81
82#[derive(Clone, Copy, Eq, PartialEq)]
83enum HealthState {
84 Live,
85 Unknown,
86 Dead,
87}
88
89static_metrics!(
90 name => Telemetry,
91 prefix => health,
92 labels => [component_id: Arc<str>],
93 metrics => [
94 gauge(component_ready),
95 gauge(component_live),
96 trace_histogram(component_liveness_latency_seconds),
97 ]
98);
99
100impl Telemetry {
101 fn from_name(name: &str) -> Self {
102 Self::new(Arc::from(name))
103 }
104
105 fn update_readiness(&self, ready: bool) {
106 self.component_ready().set(if ready { 1.0 } else { 0.0 });
107 }
108
109 fn update_liveness(&self, state: HealthState, response_latency: Duration) {
110 let live = match state {
111 HealthState::Live => 1.0,
112 HealthState::Unknown => 0.0,
113 HealthState::Dead => -1.0,
114 };
115
116 self.component_live().set(live);
117 self.component_liveness_latency_seconds()
118 .record(response_latency.as_secs_f64());
119 }
120}
121
122struct SharedComponentState {
123 ready: AtomicBool,
124 telemetry: Telemetry,
125}
126
127struct ComponentState {
128 name: MetaString,
129 health: HealthState,
130 shared: Arc<SharedComponentState>,
131 request_tx: mpsc::Sender<LivenessRequest>,
132 last_response: Instant,
133 last_response_latency: Duration,
134}
135
136impl ComponentState {
137 fn new(
138 name: MetaString, response_tx: mpsc::Sender<LivenessResponse>, readiness_notify: Arc<Notify>,
139 ) -> (Self, Health) {
140 let shared = Arc::new(SharedComponentState {
141 ready: AtomicBool::new(false),
142 telemetry: Telemetry::from_name(&name),
143 });
144 let (request_tx, request_rx) = mpsc::channel(1);
145
146 let state = Self {
147 name,
148 health: HealthState::Unknown,
149 shared: Arc::clone(&shared),
150 request_tx,
151 last_response: Instant::now(),
152 last_response_latency: Duration::from_secs(0),
153 };
154
155 let handle = Health {
156 shared,
157 request_rx,
158 response_tx,
159 readiness_notify,
160 };
161
162 (state, handle)
163 }
164
165 fn is_ready(&self) -> bool {
166 self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
171 }
172
173 fn is_live(&self) -> bool {
174 self.health == HealthState::Live
175 }
176
177 fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
178 self.health = HealthState::Live;
179 self.last_response = response_sent;
180 self.last_response_latency = response_latency;
181 self.shared.telemetry.update_liveness(self.health, response_latency);
182 }
183
184 fn mark_not_live(&mut self) {
185 self.health = HealthState::Unknown;
186
187 self.shared
189 .telemetry
190 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
191 }
192
193 fn mark_dead(&mut self) {
194 self.health = HealthState::Dead;
195
196 self.shared
198 .telemetry
199 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
200 }
201}
202
203struct LivenessRequest {
204 component_id: usize,
205 timeout_key: Key,
206 request_sent: Instant,
207}
208
209impl LivenessRequest {
210 fn new(component_id: usize, timeout_key: Key) -> Self {
211 Self {
212 component_id,
213 timeout_key,
214 request_sent: Instant::now(),
215 }
216 }
217
218 fn into_response(self) -> LivenessResponse {
219 LivenessResponse {
220 request: self,
221 response_sent: Instant::now(),
222 }
223 }
224}
225
226struct LivenessResponse {
227 request: LivenessRequest,
228 response_sent: Instant,
229}
230
231enum HealthUpdate {
232 Alive {
233 last_response: Instant,
234 last_response_latency: Duration,
235 },
236 Unknown,
237 Dead,
238}
239
240impl HealthUpdate {
241 fn as_str(&self) -> &'static str {
242 match self {
243 HealthUpdate::Alive { .. } => "alive",
244 HealthUpdate::Unknown => "unknown",
245 HealthUpdate::Dead => "dead",
246 }
247 }
248}
249
250struct RegistryState {
251 registered_components: HashSet<MetaString>,
252 component_state: Vec<ComponentState>,
253 responses_tx: mpsc::Sender<LivenessResponse>,
254 responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
255 pending_components: Vec<usize>,
256 pending_components_notify: Arc<Notify>,
257 readiness_notify: Arc<Notify>,
258}
259
260impl RegistryState {
261 fn new() -> Self {
262 let (responses_tx, responses_rx) = mpsc::channel(16);
263
264 Self {
265 registered_components: HashSet::new(),
266 component_state: Vec::new(),
267 responses_tx,
268 responses_rx: Some(responses_rx),
269 pending_components: Vec::new(),
270 pending_components_notify: Arc::new(Notify::new()),
271 readiness_notify: Arc::new(Notify::new()),
272 }
273 }
274}
275
276#[derive(Clone)]
294pub struct HealthRegistry {
295 inner: Arc<Mutex<RegistryState>>,
296}
297
298impl HealthRegistry {
299 pub fn new() -> Self {
301 Self {
302 inner: Arc::new(Mutex::new(RegistryState::new())),
303 }
304 }
305
306 #[cfg(test)]
307 fn state(&self) -> Arc<Mutex<RegistryState>> {
308 Arc::clone(&self.inner)
309 }
310
311 pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
316 let mut inner = self.inner.lock().unwrap();
317
318 let name = name.into();
320 if !inner.registered_components.insert(name.clone()) {
321 return None;
322 }
323
324 let readiness_notify = Arc::clone(&inner.readiness_notify);
326 let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone(), readiness_notify);
327 let component_id = inner.component_state.len();
328 inner.component_state.push(state);
329
330 debug!(component_id, "Registered component '{}'.", name);
331
332 inner.pending_components.push(component_id);
334 inner.pending_components_notify.notify_one();
335
336 Some(handle)
337 }
338
339 pub fn api_handler(&self) -> HealthAPIHandler {
344 HealthAPIHandler::from_state(Arc::clone(&self.inner))
345 }
346
347 pub async fn all_ready(&self) {
355 let readiness_notify = {
356 let inner = self.inner.lock().unwrap();
357 Arc::clone(&inner.readiness_notify)
358 };
359
360 loop {
361 let notified = readiness_notify.notified();
363
364 if self.check_all_ready() {
365 return;
366 }
367
368 notified.await;
369 }
370 }
371
372 fn check_all_ready(&self) -> bool {
373 let inner = self.inner.lock().unwrap();
374 inner.component_state.iter().all(|component| component.is_ready())
375 }
376
377 pub fn worker(&self) -> HealthRegistryWorker {
382 HealthRegistryWorker::new(self.clone())
383 }
384
385 pub(crate) fn into_runner(self) -> Result<Runner, GenericError> {
386 let (responses_rx, pending_components_notify) = {
388 let mut inner = self.inner.lock().unwrap();
389 let responses_rx = match inner.responses_rx.take() {
390 Some(rx) => rx,
391 None => return Err(generic_error!("health registry already spawned")),
392 };
393
394 let pending_components_notify = Arc::clone(&inner.pending_components_notify);
395 (responses_rx, pending_components_notify)
396 };
397
398 Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
399 }
400}
401
402struct RunnerGuard {
408 registry: Arc<Mutex<RegistryState>>,
409 responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
410}
411
412impl Drop for RunnerGuard {
413 fn drop(&mut self) {
414 if let Some(rx) = self.responses_rx.take() {
415 let mut inner = self.registry.lock().expect("registry state poisoned");
416 inner.responses_rx = Some(rx);
417 debug!("Returned response receiver to registry state.");
418 }
419 }
420}
421
422#[cfg(test)]
423struct RunnerState {
424 pending_scheduled_probes: AtomicUsize,
425 pending_probe_timeouts: AtomicUsize,
426}
427
428#[cfg(test)]
429impl RunnerState {
430 fn new() -> Self {
431 Self {
432 pending_scheduled_probes: AtomicUsize::new(0),
433 pending_probe_timeouts: AtomicUsize::new(0),
434 }
435 }
436
437 fn pending_scheduled_probes(&self) -> usize {
438 self.pending_scheduled_probes.load(Relaxed)
439 }
440
441 fn pending_probe_timeouts(&self) -> usize {
442 self.pending_probe_timeouts.load(Relaxed)
443 }
444
445 fn increment_pending_scheduled_probes(&self) {
446 self.pending_scheduled_probes.fetch_add(1, Relaxed);
447 }
448
449 fn increment_pending_probe_timeouts(&self) {
450 self.pending_probe_timeouts.fetch_add(1, Relaxed);
451 }
452
453 fn decrement_pending_scheduled_probes(&self) {
454 self.pending_scheduled_probes.fetch_sub(1, Relaxed);
455 }
456
457 fn decrement_pending_probe_timeouts(&self) {
458 self.pending_probe_timeouts.fetch_sub(1, Relaxed);
459 }
460}
461
462pub(super) struct Runner {
463 registry: Arc<Mutex<RegistryState>>,
464 pending_probes: DelayQueue<usize>,
465 pending_timeouts: DelayQueue<usize>,
466 guard: RunnerGuard,
467 pending_components_notify: Arc<Notify>,
468 #[cfg(test)]
469 state: Arc<RunnerState>,
470}
471
472impl Runner {
473 fn new(
474 registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
475 pending_components_notify: Arc<Notify>,
476 ) -> Self {
477 #[cfg(test)]
478 let state = Arc::new(RunnerState::new());
479
480 let guard = RunnerGuard {
481 registry: Arc::clone(®istry),
482 responses_rx: Some(responses_rx),
483 };
484
485 Self {
486 registry,
487 pending_probes: DelayQueue::new(),
488 pending_timeouts: DelayQueue::new(),
489 guard,
490 pending_components_notify,
491 #[cfg(test)]
492 state,
493 }
494 }
495
496 #[cfg(test)]
497 fn state(&self) -> Arc<RunnerState> {
498 Arc::clone(&self.state)
499 }
500
501 fn drain_pending_components(&mut self) -> Vec<usize> {
502 let mut registry = self.registry.lock().unwrap();
504 registry.pending_components.drain(..).collect()
505 }
506
507 fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
508 let mut registry = self.registry.lock().unwrap();
509 let component_state = &mut registry.component_state[component_id];
510
511 if component_state.request_tx.is_closed() {
513 debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
514 return Some(HealthUpdate::Dead);
515 }
516
517 trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
518
519 let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
524
525 #[cfg(test)]
526 self.state.increment_pending_probe_timeouts();
527
528 let request = LivenessRequest::new(component_id, timeout_key);
529 if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
530 debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
531
532 self.pending_timeouts.remove(&request.timeout_key);
538
539 #[cfg(test)]
540 self.state.decrement_pending_probe_timeouts();
541
542 return Some(HealthUpdate::Dead);
543 }
544
545 None
546 }
547
548 fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
549 #[cfg(test)]
550 self.state.increment_pending_scheduled_probes();
551
552 self.pending_probes.insert(component_id, duration);
553 }
554
555 fn schedule_all_existing_components(&mut self, responses_rx: &mut mpsc::Receiver<LivenessResponse>) {
556 let _pending = self.drain_pending_components();
559
560 while let Ok(response) = responses_rx.try_recv() {
565 self.handle_component_probe_response(response);
566 }
567
568 let (component_count, stale_component_ids) = {
572 let registry = self.registry.lock().unwrap();
573 let now = Instant::now();
574 let stale_ids: Vec<usize> = (0..registry.component_state.len())
575 .filter(|&id| {
576 now.duration_since(registry.component_state[id].last_response) >= DEFAULT_PROBE_TIMEOUT_DUR
577 })
578 .collect();
579 (registry.component_state.len(), stale_ids)
580 };
581
582 for &component_id in &stale_component_ids {
584 self.process_component_health_update(component_id, HealthUpdate::Unknown);
585 }
586
587 for component_id in 0..component_count {
589 self.schedule_probe_for_component(component_id, Duration::ZERO);
590 }
591
592 if component_count > 0 {
593 let fresh_count = component_count - stale_component_ids.len();
594 debug!(
595 component_count,
596 fresh_count,
597 stale_count = stale_component_ids.len(),
598 "Scheduled probes for all existing components."
599 );
600 }
601 }
602
603 fn handle_component_probe_response(&mut self, response: LivenessResponse) {
604 let component_id = response.request.component_id;
605 let timeout_key = response.request.timeout_key;
606 let request_sent = response.request.request_sent;
607 let response_sent = response.response_sent;
608 let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
609
610 let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
612 if !timeout_was_pending {
613 let mut registry = self.registry.lock().unwrap();
614 let component_state = &mut registry.component_state[component_id];
615
616 debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
617 }
618
619 let update = HealthUpdate::Alive {
621 last_response: response_sent,
622 last_response_latency: response_latency,
623 };
624 self.process_component_health_update(component_id, update);
625
626 if timeout_was_pending {
629 #[cfg(test)]
630 self.state.decrement_pending_probe_timeouts();
631
632 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
633 }
634 }
635
636 fn handle_component_timeout(&mut self, component_id: usize) {
637 self.process_component_health_update(component_id, HealthUpdate::Unknown);
639
640 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
642 }
643
644 fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
645 let mut registry = self.registry.lock().unwrap();
647 let component_state = &mut registry.component_state[component_id];
648 trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
649
650 match update {
651 HealthUpdate::Alive {
652 last_response,
653 last_response_latency,
654 } => component_state.mark_live(last_response, last_response_latency),
655 HealthUpdate::Unknown => component_state.mark_not_live(),
656 HealthUpdate::Dead => component_state.mark_dead(),
657 }
658 }
659
660 async fn run<F: Future<Output = ()>>(mut self, shutdown: F) {
661 info!("Health checker running.");
662
663 let mut responses_rx = self
666 .guard
667 .responses_rx
668 .take()
669 .expect("responses_rx should always be Some when Runner is created");
670
671 self.schedule_all_existing_components(&mut responses_rx);
675
676 let mut shutdown = std::pin::pin!(shutdown);
678
679 loop {
680 select! {
681 _ = &mut shutdown => {
683 info!("Health checker shutting down.");
684 break;
685 },
686
687 Some(entry) = self.pending_probes.next() => {
689 #[cfg(test)]
690 self.state.decrement_pending_scheduled_probes();
691
692 let component_id = entry.into_inner();
693 if let Some(health_update) = self.send_component_probe_request(component_id) {
694 self.process_component_health_update(component_id, health_update);
697 }
698 },
699
700 Some(entry) = self.pending_timeouts.next() => {
702 #[cfg(test)]
703 self.state.decrement_pending_probe_timeouts();
704
705 let component_id = entry.into_inner();
706 self.handle_component_timeout(component_id);
707 },
708
709 Some(response) = responses_rx.recv() => {
711 self.handle_component_probe_response(response);
712 },
713
714 _ = self.pending_components_notify.notified() => {
716 let pending_component_ids = self.drain_pending_components();
718 for pending_component_id in pending_component_ids {
719 self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
720 self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
721 }
722 },
723 }
724 }
725
726 self.guard.responses_rx = Some(responses_rx);
728
729 }
732}
733
734#[cfg(test)]
735mod tests {
736 use std::future::Future;
737
738 use futures::FutureExt as _;
739 use tokio::sync::oneshot;
740 use tokio_test::{
741 assert_pending, assert_ready,
742 task::{spawn, Spawn},
743 };
744
745 use super::*;
746
747 const COMPONENT_ID: &str = "test_component";
748
749 #[track_caller]
750 fn initialize_registry_with_component(
751 component_id: &str,
752 ) -> (
753 Health,
754 Spawn<impl Future<Output = ()>>,
755 Arc<Mutex<RegistryState>>,
756 Arc<RunnerState>,
757 ) {
758 let registry = HealthRegistry::new();
759 let registry_state = registry.state();
760
761 let handle = registry.register_component(component_id).unwrap();
763
764 let runner = registry.into_runner().expect("should not fail to create runner");
768 let runner_state = runner.state();
769
770 let shutdown = std::future::pending();
772 let registry_task = spawn(runner.run(shutdown));
773
774 (handle, registry_task, registry_state, runner_state)
775 }
776
777 #[track_caller]
778 fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
779 assert_pending!(task.poll());
780 while task.is_woken() {
781 assert_pending!(task.poll());
782 }
783 }
784
785 fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
786 let state = state.lock().unwrap();
787 state
788 .component_state
789 .iter()
790 .find(|state| state.name == component_id)
791 .map(|state| state.is_live())
792 .unwrap()
793 }
794
795 #[test]
796 fn basic_registration() {
797 let registry = HealthRegistry::new();
798 assert!(registry.register_component(COMPONENT_ID).is_some());
799 }
800
801 #[test]
802 fn duplicate_component_registration_fails() {
803 let registry = HealthRegistry::new();
804
805 assert!(registry.register_component(COMPONENT_ID).is_some());
807 assert!(registry.register_component(COMPONENT_ID).is_none());
808 }
809
810 #[test]
811 fn duplicate_runner_creation_fails_while_running() {
812 let registry = HealthRegistry::new();
813 let registry2 = registry.clone();
814
815 let _runner = registry.into_runner().expect("first runner creation should succeed");
818
819 assert!(registry2.into_runner().is_err());
821 }
822
823 #[tokio::test]
824 async fn registry_can_be_respawned_after_shutdown() {
825 let registry = HealthRegistry::new();
826 let registry2 = registry.clone();
827 let registry3 = registry.clone();
828
829 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
831 let runner = registry.into_runner().expect("first runner creation should succeed");
832
833 let join_handle = tokio::spawn(runner.run(shutdown_rx.map(|_| ())));
835
836 let _ = shutdown_tx.send(());
838
839 join_handle.await.expect("runner should complete without panic");
841
842 let _runner2 = registry2
844 .into_runner()
845 .expect("should be able to create runner after shutdown");
846
847 assert!(
849 registry3.into_runner().is_err(),
850 "should not be able to create runner while one exists"
851 );
852 }
853
854 #[test]
855 fn readiness() {
856 let registry = HealthRegistry::new();
857
858 let mut all_ready_fut = spawn(registry.all_ready());
860 assert_ready!(all_ready_fut.poll());
861
862 let mut handle = registry.register_component(COMPONENT_ID).unwrap();
864
865 let mut all_ready_fut = spawn(registry.all_ready());
866 assert_pending!(all_ready_fut.poll());
867
868 handle.mark_ready();
870
871 assert!(all_ready_fut.is_woken());
872 assert_ready!(all_ready_fut.poll());
873
874 let mut all_ready_fut = spawn(registry.all_ready());
876 assert_ready!(all_ready_fut.poll());
877
878 handle.mark_not_ready();
880
881 let mut all_ready_fut = spawn(registry.all_ready());
882 assert_pending!(all_ready_fut.poll());
883 }
884
885 #[tokio::test(start_paused = true)]
886 async fn component_responds_before_timeout() {
887 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
889
890 let mut live_future = spawn(handle.live());
893 assert_pending!(live_future.poll());
894 assert_eq!(runner_state.pending_probe_timeouts(), 0);
895 assert_eq!(runner_state.pending_scheduled_probes(), 0);
896
897 drive_until_quiesced(&mut registry);
899 assert_eq!(runner_state.pending_probe_timeouts(), 1);
900 assert_eq!(runner_state.pending_scheduled_probes(), 0);
901
902 assert!(!component_live(®istry_state, COMPONENT_ID));
904
905 assert!(live_future.is_woken());
909 assert_ready!(live_future.poll());
910
911 assert!(registry.is_woken());
915 drive_until_quiesced(&mut registry);
916
917 assert!(component_live(®istry_state, COMPONENT_ID));
918
919 assert_eq!(runner_state.pending_probe_timeouts(), 0);
922 assert_eq!(runner_state.pending_scheduled_probes(), 1);
923 }
924
925 #[tokio::test(start_paused = true)]
926 async fn component_responds_after_timeout() {
927 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
929
930 let mut live_future = spawn(handle.live());
933 assert_pending!(live_future.poll());
934 assert_eq!(runner_state.pending_probe_timeouts(), 0);
935 assert_eq!(runner_state.pending_scheduled_probes(), 0);
936
937 drive_until_quiesced(&mut registry);
939 assert_eq!(runner_state.pending_probe_timeouts(), 1);
940 assert_eq!(runner_state.pending_scheduled_probes(), 0);
941
942 assert!(!component_live(®istry_state, COMPONENT_ID));
944
945 assert!(live_future.is_woken());
949 assert!(!registry.is_woken());
950
951 tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
952
953 assert!(registry.is_woken());
957 drive_until_quiesced(&mut registry);
958
959 assert!(!component_live(®istry_state, COMPONENT_ID));
960
961 assert_eq!(runner_state.pending_probe_timeouts(), 0);
964 assert_eq!(runner_state.pending_scheduled_probes(), 1);
965
966 assert_ready!(live_future.poll());
968
969 assert!(registry.is_woken());
970 drive_until_quiesced(&mut registry);
971
972 assert!(component_live(®istry_state, COMPONENT_ID));
973
974 assert_eq!(runner_state.pending_probe_timeouts(), 0);
977 assert_eq!(runner_state.pending_scheduled_probes(), 1);
978 }
979
980 #[track_caller]
981 #[allow(clippy::type_complexity)]
982 fn initialize_registry_with_component_and_shutdown(
983 component_id: &str,
984 ) -> (
985 Health,
986 Spawn<impl Future<Output = ()>>,
987 Arc<Mutex<RegistryState>>,
988 Arc<RunnerState>,
989 oneshot::Sender<()>,
990 ) {
991 let registry = HealthRegistry::new();
992 let registry_state = registry.state();
993 let handle = registry.register_component(component_id).unwrap();
994 let runner = registry.into_runner().expect("should not fail to create runner");
995 let runner_state = runner.state();
996
997 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
998 let registry_task = spawn(runner.run(shutdown_rx.map(|_| ())));
999
1000 (handle, registry_task, registry_state, runner_state, shutdown_tx)
1001 }
1002
1003 #[tokio::test(start_paused = true)]
1004 async fn respawn_preserves_fresh_component_health() {
1005 let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1007 initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1008 drive_until_quiesced(&mut registry);
1009
1010 let mut live_future = spawn(handle.live());
1012 assert_ready!(live_future.poll());
1013 drive_until_quiesced(&mut registry);
1014 assert!(component_live(®istry_state, COMPONENT_ID));
1015
1016 let _ = shutdown_tx.send(());
1018 assert_ready!(registry.poll());
1019
1020 let registry = HealthRegistry {
1022 inner: Arc::clone(®istry_state),
1023 };
1024 let runner = registry
1025 .into_runner()
1026 .expect("should be able to respawn after shutdown");
1027 let _runner_state = runner.state();
1028 let mut registry = spawn(runner.run(std::future::pending()));
1029 drive_until_quiesced(&mut registry);
1030
1031 assert!(component_live(®istry_state, COMPONENT_ID));
1033 }
1034
1035 #[tokio::test(start_paused = true)]
1036 async fn respawn_resets_stale_component_health() {
1037 let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1039 initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1040 drive_until_quiesced(&mut registry);
1041
1042 let mut live_future = spawn(handle.live());
1044 assert_ready!(live_future.poll());
1045 drive_until_quiesced(&mut registry);
1046 assert!(component_live(®istry_state, COMPONENT_ID));
1047
1048 let _ = shutdown_tx.send(());
1050 assert_ready!(registry.poll());
1051
1052 tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
1054
1055 let registry = HealthRegistry {
1057 inner: Arc::clone(®istry_state),
1058 };
1059 let runner = registry
1060 .into_runner()
1061 .expect("should be able to respawn after shutdown");
1062 let _runner_state = runner.state();
1063 let mut registry = spawn(runner.run(std::future::pending()));
1064 drive_until_quiesced(&mut registry);
1065
1066 assert!(!component_live(®istry_state, COMPONENT_ID));
1068 }
1069}