saluki_core/health/worker.rs
1use async_trait::async_trait;
2use saluki_api::{DynamicRoute, EndpointType};
3use saluki_common::sync::shutdown::ShutdownHandle;
4use saluki_error::generic_error;
5
6use super::HealthRegistry;
7use crate::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture};
8
9/// A worker that runs the health registry.
10///
11/// This is the only way to run the health registry's liveness probing event loop. The worker
12/// implements [`Supervisable`], so it should be added to a [`Supervisor`][crate::runtime::Supervisor]
13/// to be managed as part of a supervision tree.
14pub struct HealthRegistryWorker {
15 health_registry: HealthRegistry,
16}
17
18impl HealthRegistryWorker {
19 pub(super) fn new(health_registry: HealthRegistry) -> Self {
20 Self { health_registry }
21 }
22}
23
24#[async_trait]
25impl Supervisable for HealthRegistryWorker {
26 fn name(&self) -> &str {
27 "health-registry"
28 }
29
30 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
31 let runner = self.health_registry.clone().into_runner()?;
32
33 let health_routes = DynamicRoute::http(EndpointType::Unprivileged, self.health_registry.api_handler());
34
35 Ok(Box::pin(async move {
36 // Register our API routes before we actually start running.
37 DataspaceRegistry::try_current()
38 .ok_or_else(|| generic_error!("Dataspace not available."))?
39 .assert(health_routes, "health-registry-api");
40
41 // We pass the shutdown handle into the runner here, instead of our usual `select! { shutdown => ...,
42 // main_loop_future => ... }` pattern because we try to ensure that we give back the liveness receiver
43 // before the runner completes.
44 //
45 // TODO: We should actually use something like a proper mutex guard so that returning the receiver happens
46 // automatically when the runner future goes out of scope and is dropped, since right now we wouldn't be
47 // able to ensure the current behavior (returning the receiver before the runner completes) happens in the
48 // face of an exceptional error.
49 runner.run(process_shutdown).await;
50
51 Ok(())
52 }))
53 }
54}