1use std::future::Future;
4#[cfg(test)]
5use std::sync::atomic::AtomicUsize;
6use std::{
7 collections::HashSet,
8 sync::{
9 atomic::{AtomicBool, Ordering::Relaxed},
10 Arc, Mutex,
11 },
12 time::Duration,
13};
14
15use futures::StreamExt as _;
16use saluki_error::{generic_error, GenericError};
17use saluki_metrics::static_metrics;
18use stringtheory::MetaString;
19use tokio::{pin, time::Instant};
20use tokio::{
21 select,
22 sync::{
23 mpsc::{self, error::TrySendError},
24 Notify,
25 },
26};
27use tokio_util::time::{delay_queue::Key, DelayQueue};
28use tracing::{debug, info, trace};
29
30mod api;
31pub use self::api::HealthAPIHandler;
32
33mod worker;
34pub use self::worker::HealthRegistryWorker;
35
36const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
37const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
38
39pub struct Health {
41 shared: Arc<SharedComponentState>,
42 request_rx: mpsc::Receiver<LivenessRequest>,
43 response_tx: mpsc::Sender<LivenessResponse>,
44 readiness_notify: Arc<Notify>,
45}
46
47impl Health {
48 pub fn mark_ready(&mut self) {
50 self.update_readiness(true);
51 }
52
53 pub fn mark_not_ready(&mut self) {
55 self.update_readiness(false);
56 }
57
58 fn update_readiness(&self, ready: bool) {
59 self.shared.ready.store(ready, Relaxed);
60 self.shared.telemetry.update_readiness(ready);
61
62 if ready {
64 self.readiness_notify.notify_waiters();
65 }
66 }
67
68 pub async fn live(&mut self) {
73 if let Some(request) = self.request_rx.recv().await {
76 let response = request.into_response();
77 let _ = self.response_tx.send(response).await;
78 }
79 }
80}
81
82#[derive(Clone, Copy, Eq, PartialEq)]
83enum HealthState {
84 Live,
85 Unknown,
86 Dead,
87}
88
89static_metrics!(
90 name => Telemetry,
91 prefix => health,
92 labels => [component_id: Arc<str>],
93 metrics => [
94 gauge(component_ready),
95 gauge(component_live),
96 trace_histogram(component_liveness_latency_seconds),
97 ]
98);
99
100impl Telemetry {
101 fn from_name(name: &str) -> Self {
102 Self::new(Arc::from(name))
103 }
104
105 fn update_readiness(&self, ready: bool) {
106 self.component_ready().set(if ready { 1.0 } else { 0.0 });
107 }
108
109 fn update_liveness(&self, state: HealthState, response_latency: Duration) {
110 let live = match state {
111 HealthState::Live => 1.0,
112 HealthState::Unknown => 0.0,
113 HealthState::Dead => -1.0,
114 };
115
116 self.component_live().set(live);
117 self.component_liveness_latency_seconds()
118 .record(response_latency.as_secs_f64());
119 }
120}
121
122struct SharedComponentState {
123 ready: AtomicBool,
124 telemetry: Telemetry,
125}
126
127struct ComponentState {
128 name: MetaString,
129 health: HealthState,
130 shared: Arc<SharedComponentState>,
131 request_tx: mpsc::Sender<LivenessRequest>,
132 last_response: Instant,
133 last_response_latency: Duration,
134}
135
136impl ComponentState {
137 fn new(
138 name: MetaString, response_tx: mpsc::Sender<LivenessResponse>, readiness_notify: Arc<Notify>,
139 ) -> (Self, Health) {
140 let shared = Arc::new(SharedComponentState {
141 ready: AtomicBool::new(false),
142 telemetry: Telemetry::from_name(&name),
143 });
144 let (request_tx, request_rx) = mpsc::channel(1);
145
146 let state = Self {
147 name,
148 health: HealthState::Unknown,
149 shared: Arc::clone(&shared),
150 request_tx,
151 last_response: Instant::now(),
152 last_response_latency: Duration::from_secs(0),
153 };
154
155 let handle = Health {
156 shared,
157 request_rx,
158 response_tx,
159 readiness_notify,
160 };
161
162 (state, handle)
163 }
164
165 fn is_ready(&self) -> bool {
166 self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
171 }
172
173 fn is_live(&self) -> bool {
174 self.health == HealthState::Live
175 }
176
177 fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
178 self.health = HealthState::Live;
179 self.last_response = response_sent;
180 self.last_response_latency = response_latency;
181 self.shared.telemetry.update_liveness(self.health, response_latency);
182 }
183
184 fn mark_not_live(&mut self) {
185 self.health = HealthState::Unknown;
186
187 self.shared
189 .telemetry
190 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
191 }
192
193 fn mark_dead(&mut self) {
194 self.health = HealthState::Dead;
195
196 self.shared
198 .telemetry
199 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
200 }
201}
202
203struct LivenessRequest {
204 component_id: usize,
205 timeout_key: Key,
206 request_sent: Instant,
207}
208
209impl LivenessRequest {
210 fn new(component_id: usize, timeout_key: Key) -> Self {
211 Self {
212 component_id,
213 timeout_key,
214 request_sent: Instant::now(),
215 }
216 }
217
218 fn into_response(self) -> LivenessResponse {
219 LivenessResponse {
220 request: self,
221 response_sent: Instant::now(),
222 }
223 }
224}
225
226struct LivenessResponse {
227 request: LivenessRequest,
228 response_sent: Instant,
229}
230
231enum HealthUpdate {
232 Alive {
233 last_response: Instant,
234 last_response_latency: Duration,
235 },
236 Unknown,
237 Dead,
238}
239
240impl HealthUpdate {
241 fn as_str(&self) -> &'static str {
242 match self {
243 HealthUpdate::Alive { .. } => "alive",
244 HealthUpdate::Unknown => "unknown",
245 HealthUpdate::Dead => "dead",
246 }
247 }
248}
249
250struct RegistryState {
251 registered_components: HashSet<MetaString>,
252 component_state: Vec<ComponentState>,
253 responses_tx: mpsc::Sender<LivenessResponse>,
254 responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
255 pending_components: Vec<usize>,
256 pending_components_notify: Arc<Notify>,
257 readiness_notify: Arc<Notify>,
258}
259
260impl RegistryState {
261 fn new() -> Self {
262 let (responses_tx, responses_rx) = mpsc::channel(16);
263
264 Self {
265 registered_components: HashSet::new(),
266 component_state: Vec::new(),
267 responses_tx,
268 responses_rx: Some(responses_rx),
269 pending_components: Vec::new(),
270 pending_components_notify: Arc::new(Notify::new()),
271 readiness_notify: Arc::new(Notify::new()),
272 }
273 }
274}
275
276#[derive(Clone)]
294pub struct HealthRegistry {
295 inner: Arc<Mutex<RegistryState>>,
296}
297
298impl HealthRegistry {
299 pub fn new() -> Self {
301 Self {
302 inner: Arc::new(Mutex::new(RegistryState::new())),
303 }
304 }
305
306 #[cfg(test)]
307 fn state(&self) -> Arc<Mutex<RegistryState>> {
308 Arc::clone(&self.inner)
309 }
310
311 pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
316 let mut inner = self.inner.lock().unwrap();
317
318 let name = name.into();
320 if !inner.registered_components.insert(name.clone()) {
321 return None;
322 }
323
324 let readiness_notify = Arc::clone(&inner.readiness_notify);
326 let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone(), readiness_notify);
327 let component_id = inner.component_state.len();
328 inner.component_state.push(state);
329
330 debug!(component_id, "Registered component '{}'.", name);
331
332 inner.pending_components.push(component_id);
334 inner.pending_components_notify.notify_one();
335
336 Some(handle)
337 }
338
339 pub fn api_handler(&self) -> HealthAPIHandler {
344 HealthAPIHandler::from_state(Arc::clone(&self.inner))
345 }
346
347 pub async fn all_ready(&self) {
355 self.all_ready_matching(|_| true).await
356 }
357
358 pub async fn all_ready_matching<F>(&self, predicate: F)
368 where
369 F: Fn(&str) -> bool,
370 {
371 let readiness_notify = {
372 let inner = self.inner.lock().unwrap();
373 Arc::clone(&inner.readiness_notify)
374 };
375
376 loop {
377 let notified = readiness_notify.notified();
379
380 if self.check_ready_matching(&predicate) {
381 return;
382 }
383
384 notified.await;
385 }
386 }
387
388 fn check_ready_matching<F>(&self, predicate: &F) -> bool
389 where
390 F: Fn(&str) -> bool,
391 {
392 let inner = self.inner.lock().unwrap();
393 inner
394 .component_state
395 .iter()
396 .filter(|component| predicate(&component.name))
397 .all(|component| component.is_ready())
398 }
399
400 pub fn worker(&self) -> HealthRegistryWorker {
405 HealthRegistryWorker::new(self.clone())
406 }
407
408 pub(crate) fn into_runner(self) -> Result<Runner, GenericError> {
409 let (responses_rx, pending_components_notify) = {
411 let mut inner = self.inner.lock().unwrap();
412 let responses_rx = match inner.responses_rx.take() {
413 Some(rx) => rx,
414 None => return Err(generic_error!("health registry already spawned")),
415 };
416
417 let pending_components_notify = Arc::clone(&inner.pending_components_notify);
418 (responses_rx, pending_components_notify)
419 };
420
421 Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
422 }
423}
424
425struct RunnerGuard {
431 registry: Arc<Mutex<RegistryState>>,
432 responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
433}
434
435impl Drop for RunnerGuard {
436 fn drop(&mut self) {
437 if let Some(rx) = self.responses_rx.take() {
438 let mut inner = self.registry.lock().expect("registry state poisoned");
439 inner.responses_rx = Some(rx);
440 debug!("Returned response receiver to registry state.");
441 }
442 }
443}
444
445#[cfg(test)]
446struct RunnerState {
447 pending_scheduled_probes: AtomicUsize,
448 pending_probe_timeouts: AtomicUsize,
449}
450
451#[cfg(test)]
452impl RunnerState {
453 fn new() -> Self {
454 Self {
455 pending_scheduled_probes: AtomicUsize::new(0),
456 pending_probe_timeouts: AtomicUsize::new(0),
457 }
458 }
459
460 fn pending_scheduled_probes(&self) -> usize {
461 self.pending_scheduled_probes.load(Relaxed)
462 }
463
464 fn pending_probe_timeouts(&self) -> usize {
465 self.pending_probe_timeouts.load(Relaxed)
466 }
467
468 fn increment_pending_scheduled_probes(&self) {
469 self.pending_scheduled_probes.fetch_add(1, Relaxed);
470 }
471
472 fn increment_pending_probe_timeouts(&self) {
473 self.pending_probe_timeouts.fetch_add(1, Relaxed);
474 }
475
476 fn decrement_pending_scheduled_probes(&self) {
477 self.pending_scheduled_probes.fetch_sub(1, Relaxed);
478 }
479
480 fn decrement_pending_probe_timeouts(&self) {
481 self.pending_probe_timeouts.fetch_sub(1, Relaxed);
482 }
483}
484
485pub(super) struct Runner {
486 registry: Arc<Mutex<RegistryState>>,
487 pending_probes: DelayQueue<usize>,
488 pending_timeouts: DelayQueue<usize>,
489 guard: RunnerGuard,
490 pending_components_notify: Arc<Notify>,
491 #[cfg(test)]
492 state: Arc<RunnerState>,
493}
494
495impl Runner {
496 fn new(
497 registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
498 pending_components_notify: Arc<Notify>,
499 ) -> Self {
500 #[cfg(test)]
501 let state = Arc::new(RunnerState::new());
502
503 let guard = RunnerGuard {
504 registry: Arc::clone(®istry),
505 responses_rx: Some(responses_rx),
506 };
507
508 Self {
509 registry,
510 pending_probes: DelayQueue::new(),
511 pending_timeouts: DelayQueue::new(),
512 guard,
513 pending_components_notify,
514 #[cfg(test)]
515 state,
516 }
517 }
518
519 #[cfg(test)]
520 fn state(&self) -> Arc<RunnerState> {
521 Arc::clone(&self.state)
522 }
523
524 fn drain_pending_components(&mut self) -> Vec<usize> {
525 let mut registry = self.registry.lock().unwrap();
527 registry.pending_components.drain(..).collect()
528 }
529
530 fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
531 let mut registry = self.registry.lock().unwrap();
532 let component_state = &mut registry.component_state[component_id];
533
534 if component_state.request_tx.is_closed() {
536 debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
537 return Some(HealthUpdate::Dead);
538 }
539
540 trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
541
542 let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
547
548 #[cfg(test)]
549 self.state.increment_pending_probe_timeouts();
550
551 let request = LivenessRequest::new(component_id, timeout_key);
552 if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
553 debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
554
555 self.pending_timeouts.remove(&request.timeout_key);
561
562 #[cfg(test)]
563 self.state.decrement_pending_probe_timeouts();
564
565 return Some(HealthUpdate::Dead);
566 }
567
568 None
569 }
570
571 fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
572 #[cfg(test)]
573 self.state.increment_pending_scheduled_probes();
574
575 self.pending_probes.insert(component_id, duration);
576 }
577
578 fn schedule_all_existing_components(&mut self, responses_rx: &mut mpsc::Receiver<LivenessResponse>) {
579 let _pending = self.drain_pending_components();
582
583 while let Ok(response) = responses_rx.try_recv() {
588 self.handle_component_probe_response(response);
589 }
590
591 let (component_count, stale_component_ids) = {
595 let registry = self.registry.lock().unwrap();
596 let now = Instant::now();
597 let stale_ids: Vec<usize> = (0..registry.component_state.len())
598 .filter(|&id| {
599 now.duration_since(registry.component_state[id].last_response) >= DEFAULT_PROBE_TIMEOUT_DUR
600 })
601 .collect();
602 (registry.component_state.len(), stale_ids)
603 };
604
605 for &component_id in &stale_component_ids {
607 self.process_component_health_update(component_id, HealthUpdate::Unknown);
608 }
609
610 for component_id in 0..component_count {
612 self.schedule_probe_for_component(component_id, Duration::ZERO);
613 }
614
615 if component_count > 0 {
616 let fresh_count = component_count - stale_component_ids.len();
617 debug!(
618 component_count,
619 fresh_count,
620 stale_count = stale_component_ids.len(),
621 "Scheduled probes for all existing components."
622 );
623 }
624 }
625
626 fn handle_component_probe_response(&mut self, response: LivenessResponse) {
627 let component_id = response.request.component_id;
628 let timeout_key = response.request.timeout_key;
629 let request_sent = response.request.request_sent;
630 let response_sent = response.response_sent;
631 let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
632
633 let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
635 if !timeout_was_pending {
636 let mut registry = self.registry.lock().unwrap();
637 let component_state = &mut registry.component_state[component_id];
638
639 debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
640 }
641
642 let update = HealthUpdate::Alive {
644 last_response: response_sent,
645 last_response_latency: response_latency,
646 };
647 self.process_component_health_update(component_id, update);
648
649 if timeout_was_pending {
652 #[cfg(test)]
653 self.state.decrement_pending_probe_timeouts();
654
655 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
656 }
657 }
658
659 fn handle_component_timeout(&mut self, component_id: usize) {
660 self.process_component_health_update(component_id, HealthUpdate::Unknown);
662
663 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
665 }
666
667 fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
668 let mut registry = self.registry.lock().unwrap();
670 let component_state = &mut registry.component_state[component_id];
671 trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
672
673 match update {
674 HealthUpdate::Alive {
675 last_response,
676 last_response_latency,
677 } => component_state.mark_live(last_response, last_response_latency),
678 HealthUpdate::Unknown => component_state.mark_not_live(),
679 HealthUpdate::Dead => component_state.mark_dead(),
680 }
681 }
682
683 async fn run<F: Future<Output = ()>>(mut self, shutdown: F) {
684 info!("Health checker running.");
685
686 let mut responses_rx = self
689 .guard
690 .responses_rx
691 .take()
692 .expect("responses_rx should always be Some when Runner is created");
693
694 self.schedule_all_existing_components(&mut responses_rx);
698
699 pin!(shutdown);
701
702 loop {
703 select! {
704 _ = &mut shutdown => {
706 info!("Health checker shutting down.");
707 break;
708 },
709
710 Some(entry) = self.pending_probes.next() => {
712 #[cfg(test)]
713 self.state.decrement_pending_scheduled_probes();
714
715 let component_id = entry.into_inner();
716 if let Some(health_update) = self.send_component_probe_request(component_id) {
717 self.process_component_health_update(component_id, health_update);
720 }
721 },
722
723 Some(entry) = self.pending_timeouts.next() => {
725 #[cfg(test)]
726 self.state.decrement_pending_probe_timeouts();
727
728 let component_id = entry.into_inner();
729 self.handle_component_timeout(component_id);
730 },
731
732 Some(response) = responses_rx.recv() => {
734 self.handle_component_probe_response(response);
735 },
736
737 _ = self.pending_components_notify.notified() => {
739 let pending_component_ids = self.drain_pending_components();
741 for pending_component_id in pending_component_ids {
742 self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
743 self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
744 }
745 },
746 }
747 }
748
749 self.guard.responses_rx = Some(responses_rx);
751
752 }
755}
756
757#[cfg(test)]
758mod tests {
759 use std::future::Future;
760
761 use futures::FutureExt as _;
762 use tokio::sync::oneshot;
763 use tokio_test::{
764 assert_pending, assert_ready,
765 task::{spawn, Spawn},
766 };
767
768 use super::*;
769
770 const COMPONENT_ID: &str = "test_component";
771
772 #[track_caller]
773 fn initialize_registry_with_component(
774 component_id: &str,
775 ) -> (
776 Health,
777 Spawn<impl Future<Output = ()>>,
778 Arc<Mutex<RegistryState>>,
779 Arc<RunnerState>,
780 ) {
781 let registry = HealthRegistry::new();
782 let registry_state = registry.state();
783
784 let handle = registry.register_component(component_id).unwrap();
786
787 let runner = registry.into_runner().expect("should not fail to create runner");
791 let runner_state = runner.state();
792
793 let shutdown = std::future::pending();
795 let registry_task = spawn(runner.run(shutdown));
796
797 (handle, registry_task, registry_state, runner_state)
798 }
799
800 #[track_caller]
801 fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
802 assert_pending!(task.poll());
803 while task.is_woken() {
804 assert_pending!(task.poll());
805 }
806 }
807
808 fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
809 let state = state.lock().unwrap();
810 state
811 .component_state
812 .iter()
813 .find(|state| state.name == component_id)
814 .map(|state| state.is_live())
815 .unwrap()
816 }
817
818 #[test]
819 fn basic_registration() {
820 let registry = HealthRegistry::new();
821 assert!(registry.register_component(COMPONENT_ID).is_some());
822 }
823
824 #[test]
825 fn duplicate_component_registration_fails() {
826 let registry = HealthRegistry::new();
827
828 assert!(registry.register_component(COMPONENT_ID).is_some());
830 assert!(registry.register_component(COMPONENT_ID).is_none());
831 }
832
833 #[test]
834 fn duplicate_runner_creation_fails_while_running() {
835 let registry = HealthRegistry::new();
836 let registry2 = registry.clone();
837
838 let _runner = registry.into_runner().expect("first runner creation should succeed");
841
842 assert!(registry2.into_runner().is_err());
844 }
845
846 #[tokio::test]
847 async fn registry_can_be_respawned_after_shutdown() {
848 let registry = HealthRegistry::new();
849 let registry2 = registry.clone();
850 let registry3 = registry.clone();
851
852 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
854 let runner = registry.into_runner().expect("first runner creation should succeed");
855
856 let join_handle = tokio::spawn(runner.run(shutdown_rx.map(|_| ())));
858
859 let _ = shutdown_tx.send(());
861
862 join_handle.await.expect("runner should complete without panic");
864
865 let _runner2 = registry2
867 .into_runner()
868 .expect("should be able to create runner after shutdown");
869
870 assert!(
872 registry3.into_runner().is_err(),
873 "should not be able to create runner while one exists"
874 );
875 }
876
877 #[test]
878 fn readiness() {
879 let registry = HealthRegistry::new();
880
881 let mut all_ready_fut = spawn(registry.all_ready());
883 assert_ready!(all_ready_fut.poll());
884
885 let mut handle = registry.register_component(COMPONENT_ID).unwrap();
887
888 let mut all_ready_fut = spawn(registry.all_ready());
889 assert_pending!(all_ready_fut.poll());
890
891 handle.mark_ready();
893
894 assert!(all_ready_fut.is_woken());
895 assert_ready!(all_ready_fut.poll());
896
897 let mut all_ready_fut = spawn(registry.all_ready());
899 assert_ready!(all_ready_fut.poll());
900
901 handle.mark_not_ready();
903
904 let mut all_ready_fut = spawn(registry.all_ready());
905 assert_pending!(all_ready_fut.poll());
906 }
907
908 #[tokio::test(start_paused = true)]
909 async fn component_responds_before_timeout() {
910 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
912
913 let mut live_future = spawn(handle.live());
916 assert_pending!(live_future.poll());
917 assert_eq!(runner_state.pending_probe_timeouts(), 0);
918 assert_eq!(runner_state.pending_scheduled_probes(), 0);
919
920 drive_until_quiesced(&mut registry);
922 assert_eq!(runner_state.pending_probe_timeouts(), 1);
923 assert_eq!(runner_state.pending_scheduled_probes(), 0);
924
925 assert!(!component_live(®istry_state, COMPONENT_ID));
927
928 assert!(live_future.is_woken());
932 assert_ready!(live_future.poll());
933
934 assert!(registry.is_woken());
938 drive_until_quiesced(&mut registry);
939
940 assert!(component_live(®istry_state, COMPONENT_ID));
941
942 assert_eq!(runner_state.pending_probe_timeouts(), 0);
945 assert_eq!(runner_state.pending_scheduled_probes(), 1);
946 }
947
948 #[tokio::test(start_paused = true)]
949 async fn component_responds_after_timeout() {
950 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
952
953 let mut live_future = spawn(handle.live());
956 assert_pending!(live_future.poll());
957 assert_eq!(runner_state.pending_probe_timeouts(), 0);
958 assert_eq!(runner_state.pending_scheduled_probes(), 0);
959
960 drive_until_quiesced(&mut registry);
962 assert_eq!(runner_state.pending_probe_timeouts(), 1);
963 assert_eq!(runner_state.pending_scheduled_probes(), 0);
964
965 assert!(!component_live(®istry_state, COMPONENT_ID));
967
968 assert!(live_future.is_woken());
972 assert!(!registry.is_woken());
973
974 tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
975
976 assert!(registry.is_woken());
980 drive_until_quiesced(&mut registry);
981
982 assert!(!component_live(®istry_state, COMPONENT_ID));
983
984 assert_eq!(runner_state.pending_probe_timeouts(), 0);
987 assert_eq!(runner_state.pending_scheduled_probes(), 1);
988
989 assert_ready!(live_future.poll());
991
992 assert!(registry.is_woken());
993 drive_until_quiesced(&mut registry);
994
995 assert!(component_live(®istry_state, COMPONENT_ID));
996
997 assert_eq!(runner_state.pending_probe_timeouts(), 0);
1000 assert_eq!(runner_state.pending_scheduled_probes(), 1);
1001 }
1002
1003 #[track_caller]
1004 #[allow(clippy::type_complexity)]
1005 fn initialize_registry_with_component_and_shutdown(
1006 component_id: &str,
1007 ) -> (
1008 Health,
1009 Spawn<impl Future<Output = ()>>,
1010 Arc<Mutex<RegistryState>>,
1011 Arc<RunnerState>,
1012 oneshot::Sender<()>,
1013 ) {
1014 let registry = HealthRegistry::new();
1015 let registry_state = registry.state();
1016 let handle = registry.register_component(component_id).unwrap();
1017 let runner = registry.into_runner().expect("should not fail to create runner");
1018 let runner_state = runner.state();
1019
1020 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
1021 let registry_task = spawn(runner.run(shutdown_rx.map(|_| ())));
1022
1023 (handle, registry_task, registry_state, runner_state, shutdown_tx)
1024 }
1025
1026 #[tokio::test(start_paused = true)]
1027 async fn respawn_preserves_fresh_component_health() {
1028 let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1030 initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1031 drive_until_quiesced(&mut registry);
1032
1033 let mut live_future = spawn(handle.live());
1035 assert_ready!(live_future.poll());
1036 drive_until_quiesced(&mut registry);
1037 assert!(component_live(®istry_state, COMPONENT_ID));
1038
1039 let _ = shutdown_tx.send(());
1041 assert_ready!(registry.poll());
1042
1043 let registry = HealthRegistry {
1045 inner: Arc::clone(®istry_state),
1046 };
1047 let runner = registry
1048 .into_runner()
1049 .expect("should be able to respawn after shutdown");
1050 let _runner_state = runner.state();
1051 let mut registry = spawn(runner.run(std::future::pending()));
1052 drive_until_quiesced(&mut registry);
1053
1054 assert!(component_live(®istry_state, COMPONENT_ID));
1056 }
1057
1058 #[tokio::test(start_paused = true)]
1059 async fn respawn_resets_stale_component_health() {
1060 let (mut handle, mut registry, registry_state, _runner_state, shutdown_tx) =
1062 initialize_registry_with_component_and_shutdown(COMPONENT_ID);
1063 drive_until_quiesced(&mut registry);
1064
1065 let mut live_future = spawn(handle.live());
1067 assert_ready!(live_future.poll());
1068 drive_until_quiesced(&mut registry);
1069 assert!(component_live(®istry_state, COMPONENT_ID));
1070
1071 let _ = shutdown_tx.send(());
1073 assert_ready!(registry.poll());
1074
1075 tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
1077
1078 let registry = HealthRegistry {
1080 inner: Arc::clone(®istry_state),
1081 };
1082 let runner = registry
1083 .into_runner()
1084 .expect("should be able to respawn after shutdown");
1085 let _runner_state = runner.state();
1086 let mut registry = spawn(runner.run(std::future::pending()));
1087 drive_until_quiesced(&mut registry);
1088
1089 assert!(!component_live(®istry_state, COMPONENT_ID));
1091 }
1092}