1use std::future::Future;
4#[cfg(test)]
5use std::sync::atomic::AtomicUsize;
6use std::{
7 collections::HashSet,
8 sync::{
9 atomic::{AtomicBool, Ordering::Relaxed},
10 Arc, Mutex,
11 },
12 time::Duration,
13};
14
15use futures::StreamExt as _;
16use saluki_error::{generic_error, GenericError};
17use saluki_metrics::static_metrics;
18use stringtheory::MetaString;
19use tokio::time::Instant;
20use tokio::{
21 select,
22 sync::{
23 mpsc::{self, error::TrySendError},
24 Notify,
25 },
26};
27use tokio_util::time::{delay_queue::Key, DelayQueue};
28use tracing::{debug, info, trace};
29
30mod api;
31pub use self::api::HealthAPIHandler;
32
33mod worker;
34pub use self::worker::HealthRegistryWorker;
35
36const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
37const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
38
39pub struct Health {
41 shared: Arc<SharedComponentState>,
42 request_rx: mpsc::Receiver<LivenessRequest>,
43 response_tx: mpsc::Sender<LivenessResponse>,
44}
45
46impl Health {
47 pub fn mark_ready(&mut self) {
49 self.update_readiness(true);
50 }
51
52 pub fn mark_not_ready(&mut self) {
54 self.update_readiness(false);
55 }
56
57 fn update_readiness(&self, ready: bool) {
58 self.shared.ready.store(ready, Relaxed);
59 self.shared.telemetry.update_readiness(ready);
60 }
61
62 pub async fn live(&mut self) {
67 if let Some(request) = self.request_rx.recv().await {
70 let response = request.into_response();
71 let _ = self.response_tx.send(response).await;
72 }
73 }
74}
75
76#[derive(Clone, Copy, Eq, PartialEq)]
77enum HealthState {
78 Live,
79 Unknown,
80 Dead,
81}
82
83static_metrics!(
84 name => Telemetry,
85 prefix => health,
86 labels => [component_id: Arc<str>],
87 metrics => [
88 gauge(component_ready),
89 gauge(component_live),
90 trace_histogram(component_liveness_latency_seconds),
91 ]
92);
93
94impl Telemetry {
95 fn from_name(name: &str) -> Self {
96 Self::new(Arc::from(name))
97 }
98
99 fn update_readiness(&self, ready: bool) {
100 self.component_ready().set(if ready { 1.0 } else { 0.0 });
101 }
102
103 fn update_liveness(&self, state: HealthState, response_latency: Duration) {
104 let live = match state {
105 HealthState::Live => 1.0,
106 HealthState::Unknown => 0.0,
107 HealthState::Dead => -1.0,
108 };
109
110 self.component_live().set(live);
111 self.component_liveness_latency_seconds()
112 .record(response_latency.as_secs_f64());
113 }
114}
115
116struct SharedComponentState {
117 ready: AtomicBool,
118 telemetry: Telemetry,
119}
120
121struct ComponentState {
122 name: MetaString,
123 health: HealthState,
124 shared: Arc<SharedComponentState>,
125 request_tx: mpsc::Sender<LivenessRequest>,
126 last_response: Instant,
127 last_response_latency: Duration,
128}
129
130impl ComponentState {
131 fn new(name: MetaString, response_tx: mpsc::Sender<LivenessResponse>) -> (Self, Health) {
132 let shared = Arc::new(SharedComponentState {
133 ready: AtomicBool::new(false),
134 telemetry: Telemetry::from_name(&name),
135 });
136 let (request_tx, request_rx) = mpsc::channel(1);
137
138 let state = Self {
139 name,
140 health: HealthState::Unknown,
141 shared: Arc::clone(&shared),
142 request_tx,
143 last_response: Instant::now(),
144 last_response_latency: Duration::from_secs(0),
145 };
146
147 let handle = Health {
148 shared,
149 request_rx,
150 response_tx,
151 };
152
153 (state, handle)
154 }
155
156 fn is_ready(&self) -> bool {
157 self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
162 }
163
164 fn is_live(&self) -> bool {
165 self.health == HealthState::Live
166 }
167
168 fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
169 self.health = HealthState::Live;
170 self.last_response = response_sent;
171 self.last_response_latency = response_latency;
172 self.shared.telemetry.update_liveness(self.health, response_latency);
173 }
174
175 fn mark_not_live(&mut self) {
176 self.health = HealthState::Unknown;
177
178 self.shared
180 .telemetry
181 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
182 }
183
184 fn mark_dead(&mut self) {
185 self.health = HealthState::Dead;
186
187 self.shared
189 .telemetry
190 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
191 }
192}
193
194struct LivenessRequest {
195 component_id: usize,
196 timeout_key: Key,
197 request_sent: Instant,
198}
199
200impl LivenessRequest {
201 fn new(component_id: usize, timeout_key: Key) -> Self {
202 Self {
203 component_id,
204 timeout_key,
205 request_sent: Instant::now(),
206 }
207 }
208
209 fn into_response(self) -> LivenessResponse {
210 LivenessResponse {
211 request: self,
212 response_sent: Instant::now(),
213 }
214 }
215}
216
217struct LivenessResponse {
218 request: LivenessRequest,
219 response_sent: Instant,
220}
221
222enum HealthUpdate {
223 Alive {
224 last_response: Instant,
225 last_response_latency: Duration,
226 },
227 Unknown,
228 Dead,
229}
230
231impl HealthUpdate {
232 fn as_str(&self) -> &'static str {
233 match self {
234 HealthUpdate::Alive { .. } => "alive",
235 HealthUpdate::Unknown => "unknown",
236 HealthUpdate::Dead => "dead",
237 }
238 }
239}
240
241struct RegistryState {
242 registered_components: HashSet<MetaString>,
243 component_state: Vec<ComponentState>,
244 responses_tx: mpsc::Sender<LivenessResponse>,
245 responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
246 pending_components: Vec<usize>,
247 pending_components_notify: Arc<Notify>,
248}
249
250impl RegistryState {
251 fn new() -> Self {
252 let (responses_tx, responses_rx) = mpsc::channel(16);
253
254 Self {
255 registered_components: HashSet::new(),
256 component_state: Vec::new(),
257 responses_tx,
258 responses_rx: Some(responses_rx),
259 pending_components: Vec::new(),
260 pending_components_notify: Arc::new(Notify::new()),
261 }
262 }
263}
264
265#[derive(Clone)]
283pub struct HealthRegistry {
284 inner: Arc<Mutex<RegistryState>>,
285}
286
287impl HealthRegistry {
288 pub fn new() -> Self {
290 Self {
291 inner: Arc::new(Mutex::new(RegistryState::new())),
292 }
293 }
294
295 #[cfg(test)]
296 fn state(&self) -> Arc<Mutex<RegistryState>> {
297 Arc::clone(&self.inner)
298 }
299
300 pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
305 let mut inner = self.inner.lock().unwrap();
306
307 let name = name.into();
309 if !inner.registered_components.insert(name.clone()) {
310 return None;
311 }
312
313 let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone());
315 let component_id = inner.component_state.len();
316 inner.component_state.push(state);
317
318 debug!(component_id, "Registered component '{}'.", name);
319
320 inner.pending_components.push(component_id);
322 inner.pending_components_notify.notify_one();
323
324 Some(handle)
325 }
326
327 pub fn api_handler(&self) -> HealthAPIHandler {
332 HealthAPIHandler::from_state(Arc::clone(&self.inner))
333 }
334
335 pub fn all_ready(&self) -> bool {
337 let inner = self.inner.lock().unwrap();
338
339 for component in &inner.component_state {
340 if !component.is_ready() {
341 return false;
342 }
343 }
344
345 true
346 }
347
348 pub fn worker(&self) -> HealthRegistryWorker {
353 HealthRegistryWorker::new(self.clone())
354 }
355
356 pub(crate) fn into_runner(self) -> Result<Runner, GenericError> {
357 let (responses_rx, pending_components_notify) = {
359 let mut inner = self.inner.lock().unwrap();
360 let responses_rx = match inner.responses_rx.take() {
361 Some(rx) => rx,
362 None => return Err(generic_error!("health registry already spawned")),
363 };
364
365 let pending_components_notify = Arc::clone(&inner.pending_components_notify);
366 (responses_rx, pending_components_notify)
367 };
368
369 Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
370 }
371}
372
373struct RunnerGuard {
379 registry: Arc<Mutex<RegistryState>>,
380 responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
381}
382
383impl Drop for RunnerGuard {
384 fn drop(&mut self) {
385 if let Some(rx) = self.responses_rx.take() {
386 let mut inner = self.registry.lock().expect("registry state poisoned");
387 inner.responses_rx = Some(rx);
388 debug!("Returned response receiver to registry state.");
389 }
390 }
391}
392
393#[cfg(test)]
394struct RunnerState {
395 pending_scheduled_probes: AtomicUsize,
396 pending_probe_timeouts: AtomicUsize,
397}
398
399#[cfg(test)]
400impl RunnerState {
401 fn new() -> Self {
402 Self {
403 pending_scheduled_probes: AtomicUsize::new(0),
404 pending_probe_timeouts: AtomicUsize::new(0),
405 }
406 }
407
408 fn pending_scheduled_probes(&self) -> usize {
409 self.pending_scheduled_probes.load(Relaxed)
410 }
411
412 fn pending_probe_timeouts(&self) -> usize {
413 self.pending_probe_timeouts.load(Relaxed)
414 }
415
416 fn increment_pending_scheduled_probes(&self) {
417 self.pending_scheduled_probes.fetch_add(1, Relaxed);
418 }
419
420 fn increment_pending_probe_timeouts(&self) {
421 self.pending_probe_timeouts.fetch_add(1, Relaxed);
422 }
423
424 fn decrement_pending_scheduled_probes(&self) {
425 self.pending_scheduled_probes.fetch_sub(1, Relaxed);
426 }
427
428 fn decrement_pending_probe_timeouts(&self) {
429 self.pending_probe_timeouts.fetch_sub(1, Relaxed);
430 }
431}
432
433pub(super) struct Runner {
434 registry: Arc<Mutex<RegistryState>>,
435 pending_probes: DelayQueue<usize>,
436 pending_timeouts: DelayQueue<usize>,
437 guard: RunnerGuard,
438 pending_components_notify: Arc<Notify>,
439 #[cfg(test)]
440 state: Arc<RunnerState>,
441}
442
443impl Runner {
444 fn new(
445 registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
446 pending_components_notify: Arc<Notify>,
447 ) -> Self {
448 #[cfg(test)]
449 let state = Arc::new(RunnerState::new());
450
451 let guard = RunnerGuard {
452 registry: Arc::clone(®istry),
453 responses_rx: Some(responses_rx),
454 };
455
456 Self {
457 registry,
458 pending_probes: DelayQueue::new(),
459 pending_timeouts: DelayQueue::new(),
460 guard,
461 pending_components_notify,
462 #[cfg(test)]
463 state,
464 }
465 }
466
467 #[cfg(test)]
468 fn state(&self) -> Arc<RunnerState> {
469 Arc::clone(&self.state)
470 }
471
472 fn drain_pending_components(&mut self) -> Vec<usize> {
473 let mut registry = self.registry.lock().unwrap();
475 registry.pending_components.drain(..).collect()
476 }
477
478 fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
479 let mut registry = self.registry.lock().unwrap();
480 let component_state = &mut registry.component_state[component_id];
481
482 if component_state.request_tx.is_closed() {
484 debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
485 return Some(HealthUpdate::Dead);
486 }
487
488 trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
489
490 let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
495
496 #[cfg(test)]
497 self.state.increment_pending_probe_timeouts();
498
499 let request = LivenessRequest::new(component_id, timeout_key);
500 if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
501 debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
502
503 self.pending_timeouts.remove(&request.timeout_key);
509
510 #[cfg(test)]
511 self.state.decrement_pending_probe_timeouts();
512
513 return Some(HealthUpdate::Dead);
514 }
515
516 None
517 }
518
519 fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
520 #[cfg(test)]
521 self.state.increment_pending_scheduled_probes();
522
523 self.pending_probes.insert(component_id, duration);
524 }
525
526 fn schedule_all_existing_components(&mut self, responses_rx: &mut mpsc::Receiver<LivenessResponse>) {
527 let _pending = self.drain_pending_components();
530
531 while let Ok(response) = responses_rx.try_recv() {
536 self.handle_component_probe_response(response);
537 }
538
539 let (component_count, stale_component_ids) = {
543 let registry = self.registry.lock().unwrap();
544 let now = Instant::now();
545 let stale_ids: Vec<usize> = (0..registry.component_state.len())
546 .filter(|&id| {
547 now.duration_since(registry.component_state[id].last_response) >= DEFAULT_PROBE_TIMEOUT_DUR
548 })
549 .collect();
550 (registry.component_state.len(), stale_ids)
551 };
552
553 for &component_id in &stale_component_ids {
555 self.process_component_health_update(component_id, HealthUpdate::Unknown);
556 }
557
558 for component_id in 0..component_count {
560 self.schedule_probe_for_component(component_id, Duration::ZERO);
561 }
562
563 if component_count > 0 {
564 let fresh_count = component_count - stale_component_ids.len();
565 debug!(
566 component_count,
567 fresh_count,
568 stale_count = stale_component_ids.len(),
569 "Scheduled probes for all existing components."
570 );
571 }
572 }
573
574 fn handle_component_probe_response(&mut self, response: LivenessResponse) {
575 let component_id = response.request.component_id;
576 let timeout_key = response.request.timeout_key;
577 let request_sent = response.request.request_sent;
578 let response_sent = response.response_sent;
579 let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
580
581 let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
583 if !timeout_was_pending {
584 let mut registry = self.registry.lock().unwrap();
585 let component_state = &mut registry.component_state[component_id];
586
587 debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
588 }
589
590 let update = HealthUpdate::Alive {
592 last_response: response_sent,
593 last_response_latency: response_latency,
594 };
595 self.process_component_health_update(component_id, update);
596
597 if timeout_was_pending {
600 #[cfg(test)]
601 self.state.decrement_pending_probe_timeouts();
602
603 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
604 }
605 }
606
607 fn handle_component_timeout(&mut self, component_id: usize) {
608 self.process_component_health_update(component_id, HealthUpdate::Unknown);
610
611 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
613 }
614
615 fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
616 let mut registry = self.registry.lock().unwrap();
618 let component_state = &mut registry.component_state[component_id];
619 trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
620
621 match update {
622 HealthUpdate::Alive {
623 last_response,
624 last_response_latency,
625 } => component_state.mark_live(last_response, last_response_latency),
626 HealthUpdate::Unknown => component_state.mark_not_live(),
627 HealthUpdate::Dead => component_state.mark_dead(),
628 }
629 }
630
631 async fn run<F: Future<Output = ()>>(mut self, shutdown: F) {
632 info!("Health checker running.");
633
634 let mut responses_rx = self
637 .guard
638 .responses_rx
639 .take()
640 .expect("responses_rx should always be Some when Runner is created");
641
642 self.schedule_all_existing_components(&mut responses_rx);
646
647 let mut shutdown = std::pin::pin!(shutdown);
649
650 loop {
651 select! {
652 _ = &mut shutdown => {
654 info!("Health checker shutting down.");
655 break;
656 },
657
658 Some(entry) = self.pending_probes.next() => {
660 #[cfg(test)]
661 self.state.decrement_pending_scheduled_probes();
662
663 let component_id = entry.into_inner();
664 if let Some(health_update) = self.send_component_probe_request(component_id) {
665 self.process_component_health_update(component_id, health_update);
668 }
669 },
670
671 Some(entry) = self.pending_timeouts.next() => {
673 #[cfg(test)]
674 self.state.decrement_pending_probe_timeouts();
675
676 let component_id = entry.into_inner();
677 self.handle_component_timeout(component_id);
678 },
679
680 Some(response) = responses_rx.recv() => {
682 self.handle_component_probe_response(response);
683 },
684
685 _ = self.pending_components_notify.notified() => {
687 let pending_component_ids = self.drain_pending_components();
689 for pending_component_id in pending_component_ids {
690 self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
691 self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
692 }
693 },
694 }
695 }
696
697 self.guard.responses_rx = Some(responses_rx);
699
700 }
703}
704
705#[cfg(test)]
706mod tests {
707 use std::future::Future;
708
709 use futures::FutureExt as _;
710 use tokio::sync::oneshot;
711 use tokio_test::{
712 assert_pending, assert_ready,
713 task::{spawn, Spawn},
714 };
715
716 use super::*;
717
718 const COMPONENT_ID: &str = "test_component";
719
720 #[track_caller]
721 fn initialize_registry_with_component(
722 component_id: &str,
723 ) -> (
724 Health,
725 Spawn<impl Future<Output = ()>>,
726 Arc<Mutex<RegistryState>>,
727 Arc<RunnerState>,
728 ) {
729 let registry = HealthRegistry::new();
730 let registry_state = registry.state();
731
732 let handle = registry.register_component(component_id).unwrap();
734
735 let runner = registry.into_runner().expect("should not fail to create runner");
739 let runner_state = runner.state();
740
741 let shutdown = std::future::pending();
743 let registry_task = spawn(runner.run(shutdown));
744
745 (handle, registry_task, registry_state, runner_state)
746 }
747
748 #[track_caller]
749 fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
750 assert_pending!(task.poll());
751 while task.is_woken() {
752 assert_pending!(task.poll());
753 }
754 }
755
756 fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
757 let state = state.lock().unwrap();
758 state
759 .component_state
760 .iter()
761 .find(|state| state.name == component_id)
762 .map(|state| state.is_live())
763 .unwrap()
764 }
765
766 #[test]
767 fn basic_registration() {
768 let registry = HealthRegistry::new();
769 assert!(registry.register_component(COMPONENT_ID).is_some());
770 }
771
772 #[test]
773 fn duplicate_component_registration_fails() {
774 let registry = HealthRegistry::new();
775
776 assert!(registry.register_component(COMPONENT_ID).is_some());
778 assert!(registry.register_component(COMPONENT_ID).is_none());
779 }
780
781 #[test]
782 fn duplicate_runner_creation_fails_while_running() {
783 let registry = HealthRegistry::new();
784 let registry2 = registry.clone();
785
786 let _runner = registry.into_runner().expect("first runner creation should succeed");
789
790 assert!(registry2.into_runner().is_err());
792 }
793
794 #[tokio::test]
795 async fn registry_can_be_respawned_after_shutdown() {
796 let registry = HealthRegistry::new();
797 let registry2 = registry.clone();
798 let registry3 = registry.clone();
799
800 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
802 let runner = registry.into_runner().expect("first runner creation should succeed");
803
804 let join_handle = tokio::spawn(runner.run(shutdown_rx.map(|_| ())));
806
807 let _ = shutdown_tx.send(());
809
810 join_handle.await.expect("runner should complete without panic");
812
813 let _runner2 = registry2
815 .into_runner()
816 .expect("should be able to create runner after shutdown");
817
818 assert!(
820 registry3.into_runner().is_err(),
821 "should not be able to create runner while one exists"
822 );
823 }
824
825 #[test]
826 fn readiness() {
827 let registry = HealthRegistry::new();
828 assert!(registry.all_ready());
829
830 let mut handle = registry.register_component(COMPONENT_ID).unwrap();
832 assert!(!registry.all_ready());
833
834 handle.mark_ready();
836 assert!(registry.all_ready());
837
838 handle.mark_not_ready();
840 assert!(!registry.all_ready());
841 }
842
843 #[tokio::test(start_paused = true)]
844 async fn component_responds_before_timeout() {
845 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
847
848 let mut live_future = spawn(handle.live());
851 assert_pending!(live_future.poll());
852 assert_eq!(runner_state.pending_probe_timeouts(), 0);
853 assert_eq!(runner_state.pending_scheduled_probes(), 0);
854
855 drive_until_quiesced(&mut registry);
857 assert_eq!(runner_state.pending_probe_timeouts(), 1);
858 assert_eq!(runner_state.pending_scheduled_probes(), 0);
859
860 assert!(!component_live(®istry_state, COMPONENT_ID));
862
863 assert!(live_future.is_woken());
867 assert_ready!(live_future.poll());
868
869 assert!(registry.is_woken());
873 drive_until_quiesced(&mut registry);
874
875 assert!(component_live(®istry_state, COMPONENT_ID));
876
877 assert_eq!(runner_state.pending_probe_timeouts(), 0);
880 assert_eq!(runner_state.pending_scheduled_probes(), 1);
881 }
882
883 #[tokio::test(start_paused = true)]
884 async fn component_responds_after_timeout() {
885 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
887
888 let mut live_future = spawn(handle.live());
891 assert_pending!(live_future.poll());
892 assert_eq!(runner_state.pending_probe_timeouts(), 0);
893 assert_eq!(runner_state.pending_scheduled_probes(), 0);
894
895 drive_until_quiesced(&mut registry);
897 assert_eq!(runner_state.pending_probe_timeouts(), 1);
898 assert_eq!(runner_state.pending_scheduled_probes(), 0);
899
900 assert!(!component_live(®istry_state, COMPONENT_ID));
902
903 assert!(live_future.is_woken());
907 assert!(!registry.is_woken());
908
909 tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
910
911 assert!(registry.is_woken());
915 drive_until_quiesced(&mut registry);
916
917 assert!(!component_live(®istry_state, COMPONENT_ID));
918
919 assert_eq!(runner_state.pending_probe_timeouts(), 0);
922 assert_eq!(runner_state.pending_scheduled_probes(), 1);
923
924 assert_ready!(live_future.poll());
926
927 assert!(registry.is_woken());
928 drive_until_quiesced(&mut registry);
929
930 assert!(component_live(®istry_state, COMPONENT_ID));
931
932 assert_eq!(runner_state.pending_probe_timeouts(), 0);
935 assert_eq!(runner_state.pending_scheduled_probes(), 1);
936 }
937
938 #[track_caller]
939 #[allow(clippy::type_complexity)]
940 fn initialize_registry_with_component_and_shutdown(
941 component_id: &str,
942 ) -> (
943 Health,
944 Spawn<impl Future<Output = ()>>,
945 Arc<Mutex<RegistryState>>,
946 Arc<RunnerState>,
947 oneshot::Sender<()>,
948 ) {
949 let registry = HealthRegistry::new();
950 let registry_state = registry.state();
951 let handle = registry.register_component(component_id).unwrap();
952 let runner = registry.into_runner().expect("should not fail to create runner");
953 let runner_state = runner.state();
954
955 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
956 let registry_task = spawn(runner.run(shutdown_rx.map(|_| ())));
957
958 (handle, registry_task, registry_state, runner_state, shutdown_tx)
959 }
960
961 #[tokio::test(start_paused = true)]
962 async fn respawn_preserves_fresh_component_health() {
963 let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
965 initialize_registry_with_component_and_shutdown(COMPONENT_ID);
966 drive_until_quiesced(&mut registry);
967
968 let mut live_future = spawn(handle.live());
970 assert_ready!(live_future.poll());
971 drive_until_quiesced(&mut registry);
972 assert!(component_live(®istry_state, COMPONENT_ID));
973
974 let _ = shutdown_tx.send(());
976 assert_ready!(registry.poll());
977
978 let registry = HealthRegistry {
980 inner: Arc::clone(®istry_state),
981 };
982 let runner = registry
983 .into_runner()
984 .expect("should be able to respawn after shutdown");
985 let _runner_state = runner.state();
986 let mut registry = spawn(runner.run(std::future::pending()));
987 drive_until_quiesced(&mut registry);
988
989 assert!(component_live(®istry_state, COMPONENT_ID));
991 }
992
993 #[tokio::test(start_paused = true)]
994 async fn respawn_resets_stale_component_health() {
995 let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
997 initialize_registry_with_component_and_shutdown(COMPONENT_ID);
998 drive_until_quiesced(&mut registry);
999
1000 let mut live_future = spawn(handle.live());
1002 assert_ready!(live_future.poll());
1003 drive_until_quiesced(&mut registry);
1004 assert!(component_live(®istry_state, COMPONENT_ID));
1005
1006 let _ = shutdown_tx.send(());
1008 assert_ready!(registry.poll());
1009
1010 tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
1012
1013 let registry = HealthRegistry {
1015 inner: Arc::clone(®istry_state),
1016 };
1017 let runner = registry
1018 .into_runner()
1019 .expect("should be able to respawn after shutdown");
1020 let _runner_state = runner.state();
1021 let mut registry = spawn(runner.run(std::future::pending()));
1022 drive_until_quiesced(&mut registry);
1023
1024 assert!(!component_live(®istry_state, COMPONENT_ID));
1026 }
1027}