Skip to main content

saluki_core/health/
worker.rs

1use async_trait::async_trait;
2use saluki_api::{DynamicRoute, EndpointType};
3use saluki_common::sync::shutdown::ShutdownHandle;
4use saluki_error::generic_error;
5
6use super::HealthRegistry;
7use crate::runtime::{state::DataspaceRegistry, InitializationError, Supervisable, SupervisorFuture};
8
9/// A worker that runs the health registry.
10///
11/// This is the only way to run the health registry's liveness probing event loop. The worker
12/// implements [`Supervisable`], so it should be added to a [`Supervisor`][crate::runtime::Supervisor]
13/// to be managed as part of a supervision tree.
14pub struct HealthRegistryWorker {
15    health_registry: HealthRegistry,
16}
17
18impl HealthRegistryWorker {
19    pub(super) fn new(health_registry: HealthRegistry) -> Self {
20        Self { health_registry }
21    }
22}
23
24#[async_trait]
25impl Supervisable for HealthRegistryWorker {
26    fn name(&self) -> &str {
27        "health-registry"
28    }
29
30    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
31        let runner = self.health_registry.clone().into_runner()?;
32
33        let health_routes = DynamicRoute::http(EndpointType::Unprivileged, self.health_registry.api_handler());
34
35        Ok(Box::pin(async move {
36            // Register our API routes before we actually start running.
37            DataspaceRegistry::try_current()
38                .ok_or_else(|| generic_error!("Dataspace not available."))?
39                .assert(health_routes, "health-registry-api");
40
41            // We pass the shutdown handle into the runner here, instead of our usual `select! { shutdown => ...,
42            // main_loop_future => ... }` pattern because we try to ensure that we give back the liveness receiver
43            // before the runner completes.
44            //
45            // TODO: We should actually use something like a proper mutex guard so that returning the receiver happens
46            // automatically when the runner future goes out of scope and is dropped, since right now we wouldn't be
47            // able to ensure the current behavior (returning the receiver before the runner completes) happens in the
48            // face of an exceptional error.
49            runner.run(process_shutdown).await;
50
51            Ok(())
52        }))
53    }
54}