1#[cfg(test)]
2use std::sync::atomic::AtomicUsize;
3use std::{
4 collections::HashSet,
5 sync::{
6 atomic::{AtomicBool, Ordering::Relaxed},
7 Arc, Mutex,
8 },
9 time::{Duration, Instant},
10};
11
12use futures::StreamExt as _;
13use saluki_error::{generic_error, GenericError};
14use saluki_metrics::static_metrics;
15use stringtheory::MetaString;
16use tokio::{
17 select,
18 sync::{
19 mpsc::{self, error::TrySendError},
20 Notify,
21 },
22 task::JoinHandle,
23};
24use tokio_util::time::{delay_queue::Key, DelayQueue};
25use tracing::{debug, info, trace};
26
27mod api;
28pub use self::api::HealthAPIHandler;
29
30const DEFAULT_PROBE_TIMEOUT_DUR: Duration = Duration::from_secs(5);
31const DEFAULT_PROBE_BACKOFF_DUR: Duration = Duration::from_secs(1);
32
33pub struct Health {
35 shared: Arc<SharedComponentState>,
36 request_rx: mpsc::Receiver<LivenessRequest>,
37 response_tx: mpsc::Sender<LivenessResponse>,
38}
39
40impl Health {
41 pub fn mark_ready(&mut self) {
43 self.update_readiness(true);
44 }
45
46 pub fn mark_not_ready(&mut self) {
48 self.update_readiness(false);
49 }
50
51 fn update_readiness(&self, ready: bool) {
52 self.shared.ready.store(ready, Relaxed);
53 self.shared.telemetry.update_readiness(ready);
54 }
55
56 pub async fn live(&mut self) {
61 if let Some(request) = self.request_rx.recv().await {
64 let response = request.into_response();
65 let _ = self.response_tx.send(response).await;
66 }
67 }
68}
69
70#[derive(Clone, Copy, Eq, PartialEq)]
71enum HealthState {
72 Live,
73 Unknown,
74 Dead,
75}
76
77static_metrics!(
78 name => Telemetry,
79 prefix => health,
80 labels => [component_id: Arc<str>],
81 metrics => [
82 gauge(component_ready),
83 gauge(component_live),
84 trace_histogram(component_liveness_latency_seconds),
85 ]
86);
87
88impl Telemetry {
89 fn from_name(name: &str) -> Self {
90 Self::new(Arc::from(name))
91 }
92
93 fn update_readiness(&self, ready: bool) {
94 self.component_ready().set(if ready { 1.0 } else { 0.0 });
95 }
96
97 fn update_liveness(&self, state: HealthState, response_latency: Duration) {
98 let live = match state {
99 HealthState::Live => 1.0,
100 HealthState::Unknown => 0.0,
101 HealthState::Dead => -1.0,
102 };
103
104 self.component_live().set(live);
105 self.component_liveness_latency_seconds()
106 .record(response_latency.as_secs_f64());
107 }
108}
109
110struct SharedComponentState {
111 ready: AtomicBool,
112 telemetry: Telemetry,
113}
114
115struct ComponentState {
116 name: MetaString,
117 health: HealthState,
118 shared: Arc<SharedComponentState>,
119 request_tx: mpsc::Sender<LivenessRequest>,
120 last_response: Instant,
121 last_response_latency: Duration,
122}
123
124impl ComponentState {
125 fn new(name: MetaString, response_tx: mpsc::Sender<LivenessResponse>) -> (Self, Health) {
126 let shared = Arc::new(SharedComponentState {
127 ready: AtomicBool::new(false),
128 telemetry: Telemetry::from_name(&name),
129 });
130 let (request_tx, request_rx) = mpsc::channel(1);
131
132 let state = Self {
133 name,
134 health: HealthState::Unknown,
135 shared: Arc::clone(&shared),
136 request_tx,
137 last_response: Instant::now(),
138 last_response_latency: Duration::from_secs(0),
139 };
140
141 let handle = Health {
142 shared,
143 request_rx,
144 response_tx,
145 };
146
147 (state, handle)
148 }
149
150 fn is_ready(&self) -> bool {
151 self.shared.ready.load(Relaxed) && self.health != HealthState::Dead
156 }
157
158 fn is_live(&self) -> bool {
159 self.health == HealthState::Live
160 }
161
162 fn mark_live(&mut self, response_sent: Instant, response_latency: Duration) {
163 self.health = HealthState::Live;
164 self.last_response = response_sent;
165 self.last_response_latency = response_latency;
166 self.shared.telemetry.update_liveness(self.health, response_latency);
167 }
168
169 fn mark_not_live(&mut self) {
170 self.health = HealthState::Unknown;
171
172 self.shared
174 .telemetry
175 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
176 }
177
178 fn mark_dead(&mut self) {
179 self.health = HealthState::Dead;
180
181 self.shared
183 .telemetry
184 .update_liveness(self.health, DEFAULT_PROBE_TIMEOUT_DUR);
185 }
186}
187
188struct LivenessRequest {
189 component_id: usize,
190 timeout_key: Key,
191 request_sent: Instant,
192}
193
194impl LivenessRequest {
195 fn new(component_id: usize, timeout_key: Key) -> Self {
196 Self {
197 component_id,
198 timeout_key,
199 request_sent: Instant::now(),
200 }
201 }
202
203 fn into_response(self) -> LivenessResponse {
204 LivenessResponse {
205 request: self,
206 response_sent: Instant::now(),
207 }
208 }
209}
210
211struct LivenessResponse {
212 request: LivenessRequest,
213 response_sent: Instant,
214}
215
216enum HealthUpdate {
217 Alive {
218 last_response: Instant,
219 last_response_latency: Duration,
220 },
221 Unknown,
222 Dead,
223}
224
225impl HealthUpdate {
226 fn as_str(&self) -> &'static str {
227 match self {
228 HealthUpdate::Alive { .. } => "alive",
229 HealthUpdate::Unknown => "unknown",
230 HealthUpdate::Dead => "dead",
231 }
232 }
233}
234
235struct RegistryState {
236 registered_components: HashSet<MetaString>,
237 component_state: Vec<ComponentState>,
238 responses_tx: mpsc::Sender<LivenessResponse>,
239 responses_rx: Option<mpsc::Receiver<LivenessResponse>>,
240 pending_components: Vec<usize>,
241 pending_components_notify: Arc<Notify>,
242}
243
244impl RegistryState {
245 fn new() -> Self {
246 let (responses_tx, responses_rx) = mpsc::channel(16);
247
248 Self {
249 registered_components: HashSet::new(),
250 component_state: Vec::new(),
251 responses_tx,
252 responses_rx: Some(responses_rx),
253 pending_components: Vec::new(),
254 pending_components_notify: Arc::new(Notify::new()),
255 }
256 }
257}
258
259#[derive(Clone)]
277pub struct HealthRegistry {
278 inner: Arc<Mutex<RegistryState>>,
279}
280
281impl HealthRegistry {
282 pub fn new() -> Self {
284 Self {
285 inner: Arc::new(Mutex::new(RegistryState::new())),
286 }
287 }
288
289 #[cfg(test)]
290 fn state(&self) -> Arc<Mutex<RegistryState>> {
291 Arc::clone(&self.inner)
292 }
293
294 pub fn register_component<S: Into<MetaString>>(&self, name: S) -> Option<Health> {
299 let mut inner = self.inner.lock().unwrap();
300
301 let name = name.into();
303 if !inner.registered_components.insert(name.clone()) {
304 return None;
305 }
306
307 let (state, handle) = ComponentState::new(name.clone(), inner.responses_tx.clone());
309 let component_id = inner.component_state.len();
310 inner.component_state.push(state);
311
312 debug!(component_id, "Registered component '{}'.", name);
313
314 inner.pending_components.push(component_id);
316 inner.pending_components_notify.notify_one();
317
318 Some(handle)
319 }
320
321 pub fn api_handler(&self) -> HealthAPIHandler {
326 HealthAPIHandler::from_state(Arc::clone(&self.inner))
327 }
328
329 pub fn all_ready(&self) -> bool {
331 let inner = self.inner.lock().unwrap();
332
333 for component in &inner.component_state {
334 if !component.is_ready() {
335 return false;
336 }
337 }
338
339 true
340 }
341
342 fn into_runner(self) -> Result<Runner, GenericError> {
343 let (responses_rx, pending_components_notify) = {
345 let mut inner = self.inner.lock().unwrap();
346 let responses_rx = match inner.responses_rx.take() {
347 Some(rx) => rx,
348 None => return Err(generic_error!("health registry already spawned")),
349 };
350
351 let pending_components_notify = Arc::clone(&inner.pending_components_notify);
352 (responses_rx, pending_components_notify)
353 };
354
355 Ok(Runner::new(self.inner, responses_rx, pending_components_notify))
356 }
357
358 pub async fn spawn(self) -> Result<JoinHandle<()>, GenericError> {
364 let runner = self.into_runner()?;
365 Ok(tokio::spawn(runner.run()))
366 }
367}
368
369#[cfg(test)]
370struct RunnerState {
371 pending_scheduled_probes: AtomicUsize,
372 pending_probe_timeouts: AtomicUsize,
373}
374
375#[cfg(test)]
376impl RunnerState {
377 fn new() -> Self {
378 Self {
379 pending_scheduled_probes: AtomicUsize::new(0),
380 pending_probe_timeouts: AtomicUsize::new(0),
381 }
382 }
383
384 fn pending_scheduled_probes(&self) -> usize {
385 self.pending_scheduled_probes.load(Relaxed)
386 }
387
388 fn pending_probe_timeouts(&self) -> usize {
389 self.pending_probe_timeouts.load(Relaxed)
390 }
391
392 fn increment_pending_scheduled_probes(&self) {
393 self.pending_scheduled_probes.fetch_add(1, Relaxed);
394 }
395
396 fn increment_pending_probe_timeouts(&self) {
397 self.pending_probe_timeouts.fetch_add(1, Relaxed);
398 }
399
400 fn decrement_pending_scheduled_probes(&self) {
401 self.pending_scheduled_probes.fetch_sub(1, Relaxed);
402 }
403
404 fn decrement_pending_probe_timeouts(&self) {
405 self.pending_probe_timeouts.fetch_sub(1, Relaxed);
406 }
407}
408
409struct Runner {
410 registry: Arc<Mutex<RegistryState>>,
411 pending_probes: DelayQueue<usize>,
412 pending_timeouts: DelayQueue<usize>,
413 responses_rx: mpsc::Receiver<LivenessResponse>,
414 pending_components_notify: Arc<Notify>,
415 #[cfg(test)]
416 state: Arc<RunnerState>,
417}
418
419impl Runner {
420 fn new(
421 registry: Arc<Mutex<RegistryState>>, responses_rx: mpsc::Receiver<LivenessResponse>,
422 pending_components_notify: Arc<Notify>,
423 ) -> Self {
424 #[cfg(test)]
425 let state = Arc::new(RunnerState::new());
426
427 Self {
428 registry,
429 pending_probes: DelayQueue::new(),
430 pending_timeouts: DelayQueue::new(),
431 responses_rx,
432 pending_components_notify,
433 #[cfg(test)]
434 state,
435 }
436 }
437
438 #[cfg(test)]
439 fn state(&self) -> Arc<RunnerState> {
440 Arc::clone(&self.state)
441 }
442
443 fn drain_pending_components(&mut self) -> Vec<usize> {
444 let mut registry = self.registry.lock().unwrap();
446 registry.pending_components.drain(..).collect()
447 }
448
449 fn send_component_probe_request(&mut self, component_id: usize) -> Option<HealthUpdate> {
450 let mut registry = self.registry.lock().unwrap();
451 let component_state = &mut registry.component_state[component_id];
452
453 if component_state.request_tx.is_closed() {
455 debug!(component_name = %component_state.name, "Component is dead, skipping liveness probe.");
456 return Some(HealthUpdate::Dead);
457 }
458
459 trace!(component_name = %component_state.name, probe_timeout = ?DEFAULT_PROBE_TIMEOUT_DUR, "Sending liveness probe to component.");
460
461 let timeout_key = self.pending_timeouts.insert(component_id, DEFAULT_PROBE_TIMEOUT_DUR);
466
467 #[cfg(test)]
468 self.state.increment_pending_probe_timeouts();
469
470 let request = LivenessRequest::new(component_id, timeout_key);
471 if let Err(TrySendError::Closed(request)) = component_state.request_tx.try_send(request) {
472 debug!(component_name = %component_state.name, "Component is dead, removing pending timeout.");
473
474 self.pending_timeouts.remove(&request.timeout_key);
480
481 #[cfg(test)]
482 self.state.decrement_pending_probe_timeouts();
483
484 return Some(HealthUpdate::Dead);
485 }
486
487 None
488 }
489
490 fn schedule_probe_for_component(&mut self, component_id: usize, duration: Duration) {
491 #[cfg(test)]
492 self.state.increment_pending_scheduled_probes();
493
494 self.pending_probes.insert(component_id, duration);
495 }
496
497 fn handle_component_probe_response(&mut self, response: LivenessResponse) {
498 let component_id = response.request.component_id;
499 let timeout_key = response.request.timeout_key;
500 let request_sent = response.request.request_sent;
501 let response_sent = response.response_sent;
502 let response_latency = response_sent.checked_duration_since(request_sent).unwrap_or_default();
503
504 let timeout_was_pending = self.pending_timeouts.try_remove(&timeout_key).is_some();
506 if !timeout_was_pending {
507 let mut registry = self.registry.lock().unwrap();
508 let component_state = &mut registry.component_state[component_id];
509
510 debug!(component_name = %component_state.name, "Received probe response for component that already timed out.");
511 }
512
513 let update = HealthUpdate::Alive {
515 last_response: response_sent,
516 last_response_latency: response_latency,
517 };
518 self.process_component_health_update(component_id, update);
519
520 if timeout_was_pending {
523 #[cfg(test)]
524 self.state.decrement_pending_probe_timeouts();
525
526 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
527 }
528 }
529
530 fn handle_component_timeout(&mut self, component_id: usize) {
531 self.process_component_health_update(component_id, HealthUpdate::Unknown);
533
534 self.schedule_probe_for_component(component_id, DEFAULT_PROBE_BACKOFF_DUR);
536 }
537
538 fn process_component_health_update(&mut self, component_id: usize, update: HealthUpdate) {
539 let mut registry = self.registry.lock().unwrap();
541 let component_state = &mut registry.component_state[component_id];
542 trace!(component_name = %component_state.name, status = update.as_str(), "Updating component health status.");
543
544 match update {
545 HealthUpdate::Alive {
546 last_response,
547 last_response_latency,
548 } => component_state.mark_live(last_response, last_response_latency),
549 HealthUpdate::Unknown => component_state.mark_not_live(),
550 HealthUpdate::Dead => component_state.mark_dead(),
551 }
552 }
553
554 async fn run(mut self) {
555 info!("Health checker running.");
556
557 loop {
558 select! {
559 Some(entry) = self.pending_probes.next() => {
561 #[cfg(test)]
562 self.state.decrement_pending_scheduled_probes();
563
564 let component_id = entry.into_inner();
565 if let Some(health_update) = self.send_component_probe_request(component_id) {
566 self.process_component_health_update(component_id, health_update);
569 }
570 },
571
572 Some(entry) = self.pending_timeouts.next() => {
574 #[cfg(test)]
575 self.state.decrement_pending_probe_timeouts();
576
577 let component_id = entry.into_inner();
578 self.handle_component_timeout(component_id);
579 },
580
581 Some(response) = self.responses_rx.recv() => {
583 self.handle_component_probe_response(response);
584 },
585
586 _ = self.pending_components_notify.notified() => {
588 let pending_component_ids = self.drain_pending_components();
590 for pending_component_id in pending_component_ids {
591 self.process_component_health_update(pending_component_id, HealthUpdate::Unknown);
592 self.schedule_probe_for_component(pending_component_id, Duration::ZERO);
593 }
594 },
595 }
596 }
597 }
598}
599
600#[cfg(test)]
601mod tests {
602 use std::future::Future;
603
604 use tokio_test::{
605 assert_pending, assert_ready,
606 task::{spawn, Spawn},
607 };
608
609 use super::*;
610
611 const COMPONENT_ID: &str = "test_component";
612
613 #[track_caller]
614 fn initialize_registry_with_component(
615 component_id: &str,
616 ) -> (
617 Health,
618 Spawn<impl Future<Output = ()>>,
619 Arc<Mutex<RegistryState>>,
620 Arc<RunnerState>,
621 ) {
622 let registry = HealthRegistry::new();
623 let registry_state = registry.state();
624
625 let handle = registry.register_component(component_id).unwrap();
627
628 let runner = registry.into_runner().expect("should not fail to create runner");
632 let runner_state = runner.state();
633 let registry_task = spawn(runner.run());
634
635 (handle, registry_task, registry_state, runner_state)
636 }
637
638 #[track_caller]
639 fn drive_until_quiesced<F: Future<Output = ()>>(task: &mut Spawn<F>) {
640 assert_pending!(task.poll());
641 while task.is_woken() {
642 assert_pending!(task.poll());
643 }
644 }
645
646 fn component_live(state: &Mutex<RegistryState>, component_id: &str) -> bool {
647 let state = state.lock().unwrap();
648 state
649 .component_state
650 .iter()
651 .find(|state| state.name == component_id)
652 .map(|state| state.is_live())
653 .unwrap()
654 }
655
656 #[test]
657 fn basic_registration() {
658 let registry = HealthRegistry::new();
659 assert!(registry.register_component(COMPONENT_ID).is_some());
660 }
661
662 #[test]
663 fn duplicate_component_registration_fails() {
664 let registry = HealthRegistry::new();
665
666 assert!(registry.register_component(COMPONENT_ID).is_some());
668 assert!(registry.register_component(COMPONENT_ID).is_none());
669 }
670
671 #[tokio::test]
672 async fn duplicate_registry_spawn_fails() {
673 let registry = HealthRegistry::new();
674 let registry2 = registry.clone();
675
676 assert!(registry.spawn().await.is_ok());
677 assert!(registry2.spawn().await.is_err());
678 }
679
680 #[test]
681 fn readiness() {
682 let registry = HealthRegistry::new();
683 assert!(registry.all_ready());
684
685 let mut handle = registry.register_component(COMPONENT_ID).unwrap();
687 assert!(!registry.all_ready());
688
689 handle.mark_ready();
691 assert!(registry.all_ready());
692
693 handle.mark_not_ready();
695 assert!(!registry.all_ready());
696 }
697
698 #[tokio::test(start_paused = true)]
699 async fn component_responds_before_timeout() {
700 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
702
703 let mut live_future = spawn(handle.live());
706 assert_pending!(live_future.poll());
707 assert_eq!(runner_state.pending_probe_timeouts(), 0);
708 assert_eq!(runner_state.pending_scheduled_probes(), 0);
709
710 drive_until_quiesced(&mut registry);
712 assert_eq!(runner_state.pending_probe_timeouts(), 1);
713 assert_eq!(runner_state.pending_scheduled_probes(), 0);
714
715 assert!(!component_live(®istry_state, COMPONENT_ID));
717
718 assert!(live_future.is_woken());
722 assert_ready!(live_future.poll());
723
724 assert!(registry.is_woken());
728 drive_until_quiesced(&mut registry);
729
730 assert!(component_live(®istry_state, COMPONENT_ID));
731
732 assert_eq!(runner_state.pending_probe_timeouts(), 0);
735 assert_eq!(runner_state.pending_scheduled_probes(), 1);
736 }
737
738 #[tokio::test(start_paused = true)]
739 async fn component_responds_after_timeout() {
740 let (mut handle, mut registry, registry_state, runner_state) = initialize_registry_with_component(COMPONENT_ID);
742
743 let mut live_future = spawn(handle.live());
746 assert_pending!(live_future.poll());
747 assert_eq!(runner_state.pending_probe_timeouts(), 0);
748 assert_eq!(runner_state.pending_scheduled_probes(), 0);
749
750 drive_until_quiesced(&mut registry);
752 assert_eq!(runner_state.pending_probe_timeouts(), 1);
753 assert_eq!(runner_state.pending_scheduled_probes(), 0);
754
755 assert!(!component_live(®istry_state, COMPONENT_ID));
757
758 assert!(live_future.is_woken());
762 assert!(!registry.is_woken());
763
764 tokio::time::advance(DEFAULT_PROBE_TIMEOUT_DUR + Duration::from_secs(1)).await;
765
766 assert!(registry.is_woken());
770 drive_until_quiesced(&mut registry);
771
772 assert!(!component_live(®istry_state, COMPONENT_ID));
773
774 assert_eq!(runner_state.pending_probe_timeouts(), 0);
777 assert_eq!(runner_state.pending_scheduled_probes(), 1);
778
779 assert_ready!(live_future.poll());
781
782 assert!(registry.is_woken());
783 drive_until_quiesced(&mut registry);
784
785 assert!(component_live(®istry_state, COMPONENT_ID));
786
787 assert_eq!(runner_state.pending_probe_timeouts(), 0);
790 assert_eq!(runner_state.pending_scheduled_probes(), 1);
791 }
792}