Skip to main content

saluki_app/
dynamic_api.rs

1//! Dynamic API server.
2//!
3//! Unlike [`APIBuilder`][crate::api::APIBuilder], which constructs its route set once at build time,
4//! `DynamicAPIBuilder` subscribes to runtime notifications via the dataspace registry and dynamically registers and
5//! unregisters routes as they're asserted and retracted.
6
7use std::{
8    convert::Infallible,
9    error::Error,
10    future::Future,
11    net::SocketAddr,
12    panic::{catch_unwind, AssertUnwindSafe},
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16};
17
18use arc_swap::ArcSwap;
19use async_trait::async_trait;
20use axum::{body::Body as AxumBody, Router};
21use http::{Request, Response};
22use rcgen::{generate_simple_self_signed, CertifiedKey};
23use rustls::{pki_types::PrivateKeyDer, ServerConfig};
24use rustls_pki_types::PrivatePkcs8KeyDer;
25use saluki_api::{APIHandler, DynamicRoute, EndpointProtocol, EndpointType};
26use saluki_common::{collections::FastIndexMap, sync::shutdown::ShutdownHandle};
27use saluki_core::runtime::{
28    state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter, Subscription},
29    InitializationError, Supervisable, SupervisorFuture,
30};
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::{
33    listener::ConnectionOrientedListener,
34    server::{http::HttpServer, multiplex_service::MultiplexService},
35    util::hyper::TowerToHyperService,
36    ListenAddress,
37};
38use tokio::select;
39use tonic::{body::Body as GrpcBody, server::NamedService, service::RoutesBuilder};
40use tower::Service;
41use tracing::{debug, info, warn};
42
43/// The actual bound listen address of a running dynamic API server.
44///
45/// Asserted by dynamic API servers to allow discovering the exact socket address the server is bound to.
46#[derive(Clone, Debug)]
47pub struct BoundApiAddress(pub SocketAddr);
48
49/// A dynamic API server that can add and remove routes at runtime.
50///
51/// `DynamicAPIBuilder` serves HTTP and gRPC on a given address, multiplexing both protocols on a single port. Route
52/// additions and removals are handled by subscribing to assertions/retractions of [`DynamicRoute`] in the
53/// [`DataspaceRegistry`].
54///
55/// ## Adding and removing routes
56///
57/// Any process that wants to dynamically register API routes can simply assert a [`DynamicRoute`] in the
58/// [`DataspaceRegistry`]. Retracting the assertion will remove the route, either when retracted manually or when the
59/// process owning the route assertions exits.
60///
61/// If the dynamic API server is restarted, it will re-register any routes that were previously asserted.
62///
63/// ## Static handlers and services
64///
65/// In addition to dynamic routes, callers can register static HTTP handlers and gRPC services up-front via
66/// [`with_handler`][Self::with_handler], [`with_optional_handler`][Self::with_optional_handler], and
67/// [`with_grpc_service`][Self::with_grpc_service]. These form a base router that's cloned on every rebuild and merged
68/// with the currently asserted dynamic routes. Static routes take precedence on conflicts: a dynamic route whose path
69/// and method overlap with a static route is skipped (with a warning) until the conflict clears.
70///
71/// ## Assertions
72///
73/// - `BoundApiAddress`: the actual listen address bound by the API server. Identifier is `"dynamic-<type>-api"`, where
74///   `type` is the stringified value of `EndpointType::as_str` (for example, `"dynamic-privileged-api"`)
75pub struct DynamicAPIBuilder {
76    endpoint_type: EndpointType,
77    listen_address: ListenAddress,
78    tls_config: Option<ServerConfig>,
79    http_router: Router,
80    grpc_router: RoutesBuilder,
81}
82
83impl DynamicAPIBuilder {
84    /// Creates a new `DynamicAPIBuilder` for the given endpoint type and listen address.
85    pub fn new(endpoint_type: EndpointType, listen_address: ListenAddress) -> Self {
86        Self {
87            endpoint_type,
88            listen_address,
89            tls_config: None,
90            http_router: Router::new(),
91            grpc_router: RoutesBuilder::default(),
92        }
93    }
94
95    /// Adds the given handler as a static HTTP handler.
96    ///
97    /// The handler's initial state and routes are merged into the base router. These routes are always served by the
98    /// API regardless of which dynamic routes are currently asserted.
99    pub fn with_handler<H>(mut self, handler: H) -> Self
100    where
101        H: APIHandler,
102    {
103        let handler_router = handler.generate_routes();
104        let handler_state = handler.generate_initial_state();
105        self.http_router = self.http_router.merge(handler_router.with_state(handler_state));
106        self
107    }
108
109    /// Adds the given optional handler as a static HTTP handler.
110    ///
111    /// If `handler` is `Some`, its initial state and routes are merged into the base router. Otherwise the builder is
112    /// returned unchanged.
113    pub fn with_optional_handler<H>(self, handler: Option<H>) -> Self
114    where
115        H: APIHandler,
116    {
117        if let Some(handler) = handler {
118            self.with_handler(handler)
119        } else {
120            self
121        }
122    }
123
124    /// Adds the given gRPC service as a static service on the base router.
125    pub fn with_grpc_service<S>(mut self, svc: S) -> Self
126    where
127        S: Service<Request<GrpcBody>, Response = Response<GrpcBody>, Error = Infallible>
128            + NamedService
129            + Clone
130            + Send
131            + Sync
132            + 'static,
133        S::Future: Send + 'static,
134        S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
135    {
136        self.grpc_router.add_service(svc);
137        self
138    }
139
140    /// Sets the TLS configuration for the server.
141    pub fn with_tls_config(mut self, config: ServerConfig) -> Self {
142        self.tls_config = Some(config);
143        self
144    }
145
146    /// Sets the TLS configuration for the server based on a dynamically generated, self-signed certificate.
147    pub fn with_self_signed_tls(self) -> Self {
148        let CertifiedKey { cert, signing_key } = generate_simple_self_signed(["localhost".to_owned()]).unwrap();
149        let cert_chain = vec![cert.der().clone()];
150        let key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(signing_key.serialize_der()));
151
152        let config = ServerConfig::builder()
153            .with_no_client_auth()
154            .with_single_cert(cert_chain, key)
155            .unwrap();
156
157        self.with_tls_config(config)
158    }
159}
160
161#[async_trait]
162impl Supervisable for DynamicAPIBuilder {
163    fn name(&self) -> &str {
164        match self.endpoint_type {
165            EndpointType::Unprivileged => "dynamic-unprivileged-api",
166            EndpointType::Privileged => "dynamic-privileged-api",
167        }
168    }
169
170    async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
171        // Build the static base routers.
172        //
173        // We reset the fallback route of the base gRPC router as Tonic's `unimplemented` fallback handle will collide
174        // when merging additional gRPC service routers together. We do this for every gRPC service router that we merge
175        // and then we re-apply a fallback handler that returns a standard gRPC `UNIMPLEMENTED` response when we have
176        // our final, merged gRPC router.
177        let base_http = self.http_router.clone();
178        let base_grpc = self.grpc_router.clone().routes().into_axum_router().reset_fallback();
179
180        // Create dynamic inner routers for both HTTP and gRPC sides, seeded with the static base so that the static
181        // routes are served even before any dynamic routes are asserted. The gRPC seed gets the unimplemented fallback
182        // applied so unmatched gRPC requests return the correct status from the start.
183        let (inner_http, outer_http) = create_dynamic_router(base_http.clone());
184        let (inner_grpc, outer_grpc) = create_dynamic_router(grpc_post_process(base_grpc.clone()));
185
186        let dataspace = DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
187
188        // Bind the HTTP listener immediately so we fail fast on bind errors.
189        let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone())
190            .await
191            .map_err(|e| InitializationError::Failed { source: e.into() })?;
192
193        // Get the listen address our listener is bound to and assert it.
194        //
195        // This allows other processes to find out where we've bound to when the port isn't known ahead of time,
196        // such as during tests when binding to ephemeral ports.
197        let bound_addr = listener
198            .local_addr()
199            .map_err(|e| InitializationError::Failed { source: e.into() })?;
200        dataspace.assert(BoundApiAddress(bound_addr), Identifier::named(self.name()));
201
202        let multiplexed_service = TowerToHyperService::new(MultiplexService::new(outer_http, outer_grpc));
203
204        let mut http_server = HttpServer::from_listener(listener, multiplexed_service);
205        if let Some(tls_config) = self.tls_config.clone() {
206            http_server = http_server.with_tls_config(tls_config);
207        }
208        let (server_shutdown_coordinator, error_handle) = http_server.listen();
209
210        let endpoint_type = self.endpoint_type;
211        let listen_address = self.listen_address.clone();
212
213        Ok(Box::pin(async move {
214            info!("Serving {} API on {}.", endpoint_type.name(), listen_address);
215
216            // Subscribe to all dynamic route assertions.
217            let route_assertions = dataspace.subscribe::<DynamicRoute>(IdentifierFilter::All);
218
219            select! {
220                _ = process_shutdown => {
221                    // Trigger the HTTP server to shut down and wait for it to do so gracefully.
222                    debug!(endpoint_type = endpoint_type.name(), "Triggering shutdown of dynamic API endpoint.");
223
224                    server_shutdown_coordinator.shutdown_and_wait().await;
225
226                    Ok(())
227                },
228                maybe_err = error_handle => match maybe_err {
229                    Some(e) => Err(GenericError::from(e)),
230                    None => Ok(()),
231                },
232                result = run_event_loop(inner_http, inner_grpc, base_http, base_grpc, route_assertions, endpoint_type) => result,
233            }
234        }))
235    }
236}
237
238/// A [`tower::Service`] that routes a request based on a dynamically updated [`Router`].
239///
240/// When installed as the fallback service for a top-level [`Router`], `DynamicRouterService` dynamically routing
241/// requests based on the current defined "inner" router, which itself can be hot-swapped at runtime. This allows for
242/// seamless updates to the API endpoint routing without requiring a restart of the HTTP listener or complicated
243/// configuration changes.
244#[derive(Clone)]
245struct DynamicRouterService {
246    inner_router: Arc<ArcSwap<Router>>,
247}
248
249impl DynamicRouterService {
250    fn from_inner(inner_router: &Arc<ArcSwap<Router>>) -> Self {
251        Self {
252            inner_router: Arc::clone(inner_router),
253        }
254    }
255}
256
257impl Service<http::Request<AxumBody>> for DynamicRouterService {
258    type Response = Response<AxumBody>;
259    type Error = Infallible;
260    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
261
262    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263        Poll::Ready(Ok(()))
264    }
265
266    fn call(&mut self, request: http::Request<AxumBody>) -> Self::Future {
267        let mut router = Arc::unwrap_or_clone(self.inner_router.load_full());
268        Box::pin(async move { router.call(request).await })
269    }
270}
271
272/// Runs the event loop that listens for route assertions/retractions and hot-swaps the inner routers.
273#[allow(clippy::too_many_arguments)]
274async fn run_event_loop(
275    inner_http: Arc<ArcSwap<Router>>, inner_grpc: Arc<ArcSwap<Router>>, base_http: Router, base_grpc: Router,
276    mut route_assertions: Subscription<DynamicRoute>, endpoint_type: EndpointType,
277) -> Result<(), GenericError> {
278    let mut http_handlers = FastIndexMap::default();
279    let mut grpc_handlers = FastIndexMap::default();
280
281    while let Some(update) = route_assertions.recv().await {
282        let mut rebuild_http = false;
283        let mut rebuild_grpc = false;
284
285        match update {
286            AssertionUpdate::Asserted(id, route) => {
287                if route.endpoint_type() != endpoint_type {
288                    continue;
289                }
290
291                match route.endpoint_protocol() {
292                    EndpointProtocol::Http => {
293                        debug!(?id, "Registering dynamic HTTP handler.");
294                        http_handlers.insert(id, route.into_router());
295
296                        rebuild_http = true;
297                    }
298                    EndpointProtocol::Grpc => {
299                        debug!(?id, "Registering dynamic gRPC handler.");
300                        grpc_handlers.insert(id, route.into_router());
301
302                        rebuild_grpc = true;
303                    }
304                }
305            }
306            AssertionUpdate::Retracted(id) => {
307                if http_handlers.swap_remove(&id).is_some() {
308                    debug!(?id, "Withdrawing dynamic HTTP handler.");
309                    rebuild_http = true;
310                }
311
312                if grpc_handlers.swap_remove(&id).is_some() {
313                    debug!(?id, "Withdrawing dynamic gRPC handler.");
314                    rebuild_grpc = true;
315                }
316            }
317        }
318
319        if rebuild_http {
320            rebuild_router(&inner_http, &base_http, &http_handlers, http_post_process);
321        }
322
323        if rebuild_grpc {
324            rebuild_router(&inner_grpc, &base_grpc, &grpc_handlers, grpc_post_process);
325        }
326    }
327
328    Ok(())
329}
330
331/// Creates a dynamic router pair: a swappable inner router (seeded with `initial`) and an outer router that delegates
332/// to it.
333fn create_dynamic_router(initial: Router) -> (Arc<ArcSwap<Router>>, Router) {
334    let inner = Arc::new(ArcSwap::from_pointee(initial));
335    let outer = Router::new().fallback_service(DynamicRouterService::from_inner(&inner));
336    (inner, outer)
337}
338
339/// Attempts to merge `other` into `base`, returning the merged router on success.
340///
341/// `Router::merge` panics when two routers define overlapping routes (same path and HTTP method) and axum exposes no
342/// fallible alternative. Since `Router` is opaque -- there is no public API to inspect which paths/methods a router
343/// carries -- we can't detect conflicts ahead of time.
344///
345/// To recover from the panic without losing the accumulated router state, we clone `base` before the merge attempt.
346/// The clone is passed into `catch_unwind`: if the merge panics, only the clone is in a partially mutated state and it
347/// is simply dropped. The original `base` remains intact and is returned as-is. `AssertUnwindSafe` is sound here
348/// because:
349///
350/// - The closure captures only the clone (`candidate`) and a clone of `other`. Neither aliases mutable state that
351///   outlives the closure.
352/// - The panic originates from a deterministic format string in axum's `panic_on_err!` macro -- no locks are held and
353///   no resources are leaked in the panic path.
354/// - On panic, `candidate` is dropped without further use, so any internal inconsistency is irrelevant.
355fn try_merge_router(base: &Router, id: &Identifier, other: &Router) -> Result<Router, String> {
356    let candidate = base.clone();
357    match catch_unwind(AssertUnwindSafe(|| candidate.merge(other.clone()))) {
358        Ok(merged) => Ok(merged),
359        Err(payload) => {
360            let reason = payload
361                .downcast_ref::<String>()
362                .map(|s| s.as_str())
363                .or_else(|| payload.downcast_ref::<&str>().copied())
364                .unwrap_or("unknown");
365            Err(format!("failed to merge dynamic handler {id:?}: {reason}"))
366        }
367    }
368}
369
370/// Rebuilds the merged inner router from the static `base` and all currently registered dynamic handlers, applies
371/// `post_process` to the merged router, then stores the result in the [`ArcSwap`].
372fn rebuild_router(
373    inner_router: &Arc<ArcSwap<Router>>, base: &Router, handlers: &FastIndexMap<Identifier, Router>,
374    post_process: fn(Router) -> Router,
375) {
376    let mut merged = base.clone();
377    let mut skipped = 0usize;
378
379    for (id, router) in handlers.iter() {
380        let resetable = router.clone().reset_fallback();
381        match try_merge_router(&merged, id, &resetable) {
382            Ok(new_merged) => merged = new_merged,
383            Err(reason) => {
384                warn!(%reason, "Skipping dynamic handler due to overlapping route.");
385                skipped += 1;
386            }
387        }
388    }
389
390    let merged = post_process(merged);
391    inner_router.store(Arc::new(merged));
392    debug!(handler_count = handlers.len(), skipped, "Rebuilt inner router.");
393}
394
395fn http_post_process(router: Router) -> Router {
396    router
397}
398
399/// Adds a fallback handler that returns a standard gRPC `UNIMPLEMENTED` response when no other handler matches.
400fn grpc_post_process(router: Router) -> Router {
401    router.fallback(grpc_unimplemented)
402}
403
404async fn grpc_unimplemented() -> Response<AxumBody> {
405    tonic::Status::unimplemented("").into_http()
406}
407
408#[cfg(test)]
409mod tests {
410    use std::{net::SocketAddr, time::Duration};
411
412    use async_trait::async_trait;
413    use axum::Router;
414    use http_body_util::{BodyExt as _, Empty};
415    use hyper::{body::Bytes, StatusCode};
416    use hyper_util::{client::legacy::Client, rt::TokioExecutor};
417    use saluki_api::{APIHandler, DynamicRoute, EndpointType};
418    use saluki_core::runtime::{
419        state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter},
420        InitializationError, Supervisable, Supervisor, SupervisorFuture,
421    };
422    use tokio::{
423        pin, select,
424        sync::{mpsc, oneshot},
425        task::JoinHandle,
426        time::{sleep, timeout, Instant},
427    };
428
429    use super::*;
430
431    struct SimpleHandler {
432        path: &'static str,
433        body: &'static str,
434    }
435
436    impl APIHandler for SimpleHandler {
437        type State = ();
438
439        fn generate_initial_state(&self) -> Self::State {}
440
441        fn generate_routes(&self) -> Router<Self::State> {
442            let body = self.body;
443            Router::new().route(self.path, axum::routing::get(move || async move { body }))
444        }
445    }
446
447    enum RouteCommand {
448        Assert { id: Identifier, route: DynamicRoute },
449        Retract { id: Identifier },
450    }
451
452    struct RouteAsserter {
453        commands_rx: std::sync::Mutex<Option<mpsc::Receiver<RouteCommand>>>,
454        addr_tx: std::sync::Mutex<Option<oneshot::Sender<SocketAddr>>>,
455        endpoint_type: EndpointType,
456    }
457
458    #[async_trait]
459    impl Supervisable for RouteAsserter {
460        fn name(&self) -> &str {
461            "route-asserter"
462        }
463
464        async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
465            let mut commands_rx =
466                self.commands_rx
467                    .lock()
468                    .unwrap()
469                    .take()
470                    .ok_or_else(|| InitializationError::Failed {
471                        source: generic_error!("RouteAsserter can only be initialized once"),
472                    })?;
473            let addr_tx = self.addr_tx.lock().unwrap().take();
474            let endpoint_type = self.endpoint_type;
475
476            Ok(Box::pin(async move {
477                let dataspace =
478                    DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
479
480                // Wait for the DynamicAPIBuilder to assert its bound address.
481                let bound_addr_name = match endpoint_type {
482                    EndpointType::Unprivileged => "dynamic-unprivileged-api",
483                    EndpointType::Privileged => "dynamic-privileged-api",
484                };
485                let mut addr_sub =
486                    dataspace.subscribe::<BoundApiAddress>(IdentifierFilter::exact(Identifier::named(bound_addr_name)));
487
488                let addr = match addr_sub.recv().await {
489                    Some(AssertionUpdate::Asserted(_, BoundApiAddress(mut addr))) => {
490                        // Convert 0.0.0.0 to 127.0.0.1 so the test client can connect.
491                        if addr.ip().is_unspecified() {
492                            addr.set_ip(std::net::Ipv4Addr::LOCALHOST.into());
493                        }
494                        addr
495                    }
496                    other => return Err(generic_error!("unexpected bound address update: {:?}", other)),
497                };
498
499                if let Some(tx) = addr_tx {
500                    let _ = tx.send(addr);
501                }
502
503                // Process route commands until shutdown.
504                pin!(process_shutdown);
505
506                loop {
507                    select! {
508                        _ = &mut process_shutdown => break,
509                        cmd = commands_rx.recv() => {
510                            let Some(cmd) = cmd else { break };
511                            match cmd {
512                                RouteCommand::Assert { id, route } => {
513                                    dataspace.assert(route, id);
514                                }
515                                RouteCommand::Retract { id } => {
516                                    dataspace.retract::<DynamicRoute>(id);
517                                }
518                            }
519                        }
520                    }
521                }
522
523                Ok(())
524            }))
525        }
526    }
527
528    struct TestHarness {
529        addr: SocketAddr,
530        commands: mpsc::Sender<RouteCommand>,
531        _shutdown: oneshot::Sender<()>,
532        _handle: JoinHandle<()>,
533    }
534
535    impl TestHarness {
536        async fn assert_route(&self, id: impl Into<Identifier>, route: DynamicRoute) {
537            self.commands
538                .send(RouteCommand::Assert { id: id.into(), route })
539                .await
540                .unwrap();
541        }
542
543        async fn retract_route(&self, id: impl Into<Identifier>) {
544            self.commands
545                .send(RouteCommand::Retract { id: id.into() })
546                .await
547                .unwrap();
548        }
549    }
550
551    async fn setup_test_harness(endpoint_type: EndpointType) -> TestHarness {
552        setup_test_harness_with(endpoint_type, |b| b).await
553    }
554
555    async fn setup_test_harness_with<F>(endpoint_type: EndpointType, configure: F) -> TestHarness
556    where
557        F: FnOnce(DynamicAPIBuilder) -> DynamicAPIBuilder,
558    {
559        let (commands_tx, commands_rx) = mpsc::channel(16);
560        let (addr_tx, addr_rx) = oneshot::channel();
561
562        let api_builder = configure(DynamicAPIBuilder::new(endpoint_type, ListenAddress::any_tcp(0)));
563        let route_asserter = RouteAsserter {
564            commands_rx: std::sync::Mutex::new(Some(commands_rx)),
565            addr_tx: std::sync::Mutex::new(Some(addr_tx)),
566            endpoint_type,
567        };
568
569        let mut sup = Supervisor::new("test-dynamic-api").unwrap();
570        sup.add_worker(api_builder);
571        sup.add_worker(route_asserter);
572
573        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
574        let handle = tokio::spawn(async move {
575            let _ = sup.run_with_shutdown(shutdown_rx).await;
576        });
577
578        let addr = timeout(Duration::from_secs(5), addr_rx)
579            .await
580            .expect("timed out waiting for bound address")
581            .expect("addr channel closed");
582
583        TestHarness {
584            addr,
585            commands: commands_tx,
586            _shutdown: shutdown_tx,
587            _handle: handle,
588        }
589    }
590
591    async fn http_get(addr: SocketAddr, path: &str) -> (StatusCode, String) {
592        let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
593        let uri = format!("http://{}{}", addr, path);
594        let resp = client.get(uri.parse().unwrap()).await.unwrap();
595        let status = resp.status();
596        let body = resp.into_body().collect().await.unwrap().to_bytes();
597        let body_str = String::from_utf8_lossy(&body).into_owned();
598        (status, body_str)
599    }
600
601    async fn grpc_post(addr: SocketAddr, path: &str) -> (StatusCode, http::HeaderMap) {
602        let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
603        let uri: hyper::Uri = format!("http://{}{}", addr, path).parse().unwrap();
604        let req = hyper::Request::builder()
605            .uri(uri)
606            .method(hyper::Method::POST)
607            .header(hyper::header::CONTENT_TYPE, "application/grpc")
608            .body(Empty::<Bytes>::new())
609            .unwrap();
610        let resp = client.request(req).await.unwrap();
611        (resp.status(), resp.headers().clone())
612    }
613
614    async fn assert_status_eventually(addr: SocketAddr, path: &str, expected: StatusCode) -> String {
615        let deadline = Instant::now() + Duration::from_secs(2);
616        loop {
617            let (status, body) = http_get(addr, path).await;
618            if status == expected {
619                return body;
620            }
621            if Instant::now() > deadline {
622                panic!("expected {} for {} but got {}", expected, path, status);
623            }
624            sleep(Duration::from_millis(50)).await;
625        }
626    }
627
628    // -- Tests ---------------------------------------------------------------------------
629
630    #[tokio::test]
631    async fn serves_asserted_http_route() {
632        let harness = setup_test_harness(EndpointType::Unprivileged).await;
633
634        let route = DynamicRoute::http(
635            EndpointType::Unprivileged,
636            SimpleHandler {
637                path: "/health",
638                body: "ok",
639            },
640        );
641        harness.assert_route("health", route).await;
642
643        let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
644        assert_eq!(body, "ok");
645    }
646
647    #[tokio::test]
648    async fn returns_404_for_unknown_route() {
649        let harness = setup_test_harness(EndpointType::Unprivileged).await;
650        let (status, _) = http_get(harness.addr, "/nonexistent").await;
651        assert_eq!(status, StatusCode::NOT_FOUND);
652    }
653
654    #[tokio::test]
655    async fn route_retraction_removes_route() {
656        let harness = setup_test_harness(EndpointType::Unprivileged).await;
657
658        let route = DynamicRoute::http(
659            EndpointType::Unprivileged,
660            SimpleHandler {
661                path: "/temp",
662                body: "temporary",
663            },
664        );
665        harness.assert_route("temp", route).await;
666        assert_status_eventually(harness.addr, "/temp", StatusCode::OK).await;
667
668        harness.retract_route("temp").await;
669        assert_status_eventually(harness.addr, "/temp", StatusCode::NOT_FOUND).await;
670    }
671
672    #[tokio::test]
673    async fn multiple_routes_independent_lifecycle() {
674        let harness = setup_test_harness(EndpointType::Unprivileged).await;
675
676        let route_a = DynamicRoute::http(
677            EndpointType::Unprivileged,
678            SimpleHandler {
679                path: "/a",
680                body: "alpha",
681            },
682        );
683        let route_b = DynamicRoute::http(
684            EndpointType::Unprivileged,
685            SimpleHandler {
686                path: "/b",
687                body: "bravo",
688            },
689        );
690        harness.assert_route("a", route_a).await;
691        harness.assert_route("b", route_b).await;
692
693        assert_status_eventually(harness.addr, "/a", StatusCode::OK).await;
694        assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
695
696        // Retract only /a -- /b should remain.
697        harness.retract_route("a").await;
698        assert_status_eventually(harness.addr, "/a", StatusCode::NOT_FOUND).await;
699
700        let body = assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
701        assert_eq!(body, "bravo");
702    }
703
704    #[tokio::test]
705    async fn ignores_routes_for_different_endpoint_type() {
706        let harness = setup_test_harness(EndpointType::Unprivileged).await;
707
708        // Assert a Privileged route on an Unprivileged server -- should be ignored.
709        let wrong_route = DynamicRoute::http(
710            EndpointType::Privileged,
711            SimpleHandler {
712                path: "/secret",
713                body: "secret",
714            },
715        );
716        harness.assert_route("secret", wrong_route).await;
717
718        let (status, _) = http_get(harness.addr, "/secret").await;
719        assert_eq!(status, StatusCode::NOT_FOUND);
720
721        // Now assert the same path with the correct endpoint type.
722        let right_route = DynamicRoute::http(
723            EndpointType::Unprivileged,
724            SimpleHandler {
725                path: "/secret",
726                body: "not secret",
727            },
728        );
729        harness.assert_route("secret-unpriv", right_route).await;
730
731        let body = assert_status_eventually(harness.addr, "/secret", StatusCode::OK).await;
732        assert_eq!(body, "not secret");
733    }
734
735    #[tokio::test]
736    async fn overlapping_routes_do_not_crash_server() {
737        let harness = setup_test_harness(EndpointType::Unprivileged).await;
738
739        // Assert a route at /health with identifier "health-1".
740        let route_1 = DynamicRoute::http(
741            EndpointType::Unprivileged,
742            SimpleHandler {
743                path: "/health",
744                body: "health-1",
745            },
746        );
747        harness.assert_route("health-1", route_1).await;
748        let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
749        assert_eq!(body, "health-1");
750
751        // Assert a DIFFERENT identifier with the SAME path/method. Previously this caused a panic
752        // in rebuild_router. The server should remain alive with first-writer-wins semantics.
753        let route_2 = DynamicRoute::http(
754            EndpointType::Unprivileged,
755            SimpleHandler {
756                path: "/health",
757                body: "health-2",
758            },
759        );
760        harness.assert_route("health-2", route_2).await;
761
762        // Give the event loop time to process and rebuild.
763        sleep(Duration::from_millis(200)).await;
764
765        // Server is still alive; first handler wins.
766        let (status, body) = http_get(harness.addr, "/health").await;
767        assert_eq!(status, StatusCode::OK);
768        assert_eq!(body, "health-1");
769
770        // Non-overlapping routes are unaffected.
771        let route_info = DynamicRoute::http(
772            EndpointType::Unprivileged,
773            SimpleHandler {
774                path: "/info",
775                body: "info",
776            },
777        );
778        harness.assert_route("info", route_info).await;
779        let body = assert_status_eventually(harness.addr, "/info", StatusCode::OK).await;
780        assert_eq!(body, "info");
781
782        // Retract the first /health handler -- the previously skipped second handler should now
783        // become active since the conflict no longer exists.
784        harness.retract_route("health-1").await;
785        let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
786        assert_eq!(body, "health-2");
787    }
788
789    #[tokio::test]
790    async fn overlapping_route_retraction_then_reassertion() {
791        let harness = setup_test_harness(EndpointType::Unprivileged).await;
792
793        // Assert two overlapping handlers.
794        let route_a = DynamicRoute::http(
795            EndpointType::Unprivileged,
796            SimpleHandler {
797                path: "/overlap",
798                body: "a",
799            },
800        );
801        let route_b = DynamicRoute::http(
802            EndpointType::Unprivileged,
803            SimpleHandler {
804                path: "/overlap",
805                body: "b",
806            },
807        );
808        harness.assert_route("ov-a", route_a).await;
809        harness.assert_route("ov-b", route_b).await;
810
811        // Server alive; first writer wins.
812        let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
813        assert_eq!(body, "a");
814
815        // Retract both.
816        harness.retract_route("ov-a").await;
817        harness.retract_route("ov-b").await;
818        assert_status_eventually(harness.addr, "/overlap", StatusCode::NOT_FOUND).await;
819
820        // Re-assert a single handler -- should work cleanly.
821        let route_c = DynamicRoute::http(
822            EndpointType::Unprivileged,
823            SimpleHandler {
824                path: "/overlap",
825                body: "c",
826            },
827        );
828        harness.assert_route("ov-c", route_c).await;
829        let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
830        assert_eq!(body, "c");
831    }
832
833    #[tokio::test]
834    async fn static_handler_served_without_dynamic_routes() {
835        let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
836            b.with_handler(SimpleHandler {
837                path: "/static",
838                body: "static",
839            })
840        })
841        .await;
842
843        let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
844        assert_eq!(body, "static");
845    }
846
847    #[tokio::test]
848    async fn static_and_dynamic_routes_coexist() {
849        let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
850            b.with_handler(SimpleHandler {
851                path: "/static",
852                body: "static",
853            })
854        })
855        .await;
856
857        // Static route is served immediately.
858        let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
859        assert_eq!(body, "static");
860
861        // Add a dynamic route on a different path -- both should serve.
862        let dynamic_route = DynamicRoute::http(
863            EndpointType::Unprivileged,
864            SimpleHandler {
865                path: "/dynamic",
866                body: "dynamic",
867            },
868        );
869        harness.assert_route("dyn", dynamic_route).await;
870
871        let body = assert_status_eventually(harness.addr, "/dynamic", StatusCode::OK).await;
872        assert_eq!(body, "dynamic");
873
874        let (status, body) = http_get(harness.addr, "/static").await;
875        assert_eq!(status, StatusCode::OK);
876        assert_eq!(body, "static");
877
878        // Retracting the dynamic route leaves the static route untouched.
879        harness.retract_route("dyn").await;
880        assert_status_eventually(harness.addr, "/dynamic", StatusCode::NOT_FOUND).await;
881
882        let (status, body) = http_get(harness.addr, "/static").await;
883        assert_eq!(status, StatusCode::OK);
884        assert_eq!(body, "static");
885    }
886
887    #[tokio::test]
888    async fn unknown_grpc_method_returns_unimplemented() {
889        let harness = setup_test_harness(EndpointType::Unprivileged).await;
890        let (status, headers) = grpc_post(harness.addr, "/some.Service/Method").await;
891
892        // gRPC errors are reported with HTTP 200 plus a `grpc-status` header. UNIMPLEMENTED is code 12.
893        assert_eq!(status, StatusCode::OK);
894        let grpc_status = headers.get("grpc-status").and_then(|v| v.to_str().ok());
895        assert_eq!(grpc_status, Some("12"));
896    }
897
898    #[tokio::test]
899    async fn static_route_wins_overlap_with_dynamic() {
900        let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
901            b.with_handler(SimpleHandler {
902                path: "/overlap",
903                body: "static",
904            })
905        })
906        .await;
907
908        // Static route is served.
909        let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
910        assert_eq!(body, "static");
911
912        // Asserting a dynamic route at the same path is skipped due to overlap -- static still wins.
913        let dynamic_route = DynamicRoute::http(
914            EndpointType::Unprivileged,
915            SimpleHandler {
916                path: "/overlap",
917                body: "dynamic",
918            },
919        );
920        harness.assert_route("dyn-overlap", dynamic_route).await;
921
922        sleep(Duration::from_millis(200)).await;
923
924        let (status, body) = http_get(harness.addr, "/overlap").await;
925        assert_eq!(status, StatusCode::OK);
926        assert_eq!(body, "static");
927    }
928}