Skip to main content

saluki_app/
dynamic_api.rs

1//! Dynamic API server.
2//!
3//! Unlike [`APIBuilder`][crate::api::APIBuilder], which constructs its route set once at build time,
4//! `DynamicAPIBuilder` subscribes to runtime notifications via the dataspace registry and dynamically registers and
5//! unregisters routes as they're asserted and retracted.
6
7use std::{
8    convert::Infallible,
9    error::Error,
10    future::Future,
11    net::SocketAddr,
12    panic::{catch_unwind, AssertUnwindSafe},
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16};
17
18use arc_swap::ArcSwap;
19use async_trait::async_trait;
20use axum::{body::Body as AxumBody, Router};
21use http::{Request, Response};
22use rcgen::{generate_simple_self_signed, CertifiedKey};
23use rustls::{pki_types::PrivateKeyDer, ServerConfig};
24use rustls_pki_types::PrivatePkcs8KeyDer;
25use saluki_api::{APIHandler, DynamicRoute, EndpointProtocol, EndpointType};
26use saluki_common::collections::FastIndexMap;
27use saluki_core::runtime::{
28    state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter, Subscription},
29    InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
30};
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::{
33    listener::ConnectionOrientedListener,
34    server::{
35        http::{ErrorHandle, HttpServer, ShutdownHandle},
36        multiplex_service::MultiplexService,
37    },
38    util::hyper::TowerToHyperService,
39    ListenAddress,
40};
41use tokio::{pin, select};
42use tonic::{body::Body as GrpcBody, server::NamedService, service::RoutesBuilder};
43use tower::Service;
44use tracing::{debug, info, warn};
45
46/// The actual bound listen address of a running dynamic API server.
47///
48/// Asserted by dynamic API servers to allow discovering the exact socket address the server is bound to.
49#[derive(Clone, Debug)]
50pub struct BoundApiAddress(pub SocketAddr);
51
52/// A dynamic API server that can add and remove routes at runtime.
53///
54/// `DynamicAPIBuilder` serves HTTP and gRPC on a given address, multiplexing both protocols on a single port. Route
55/// additions and removals are handled by subscribing to assertions/retractions of [`DynamicRoute`] in the
56/// [`DataspaceRegistry`].
57///
58/// ## Adding and removing routes
59///
60/// Any process that wants to dynamically register API routes can simply assert a [`DynamicRoute`] in the
61/// [`DataspaceRegistry`]. Retracting the assertion will remove the route, either when retracted manually or when the
62/// process owning the route assertions exits.
63///
64/// If the dynamic API server is restarted, it will re-register any routes that were previously asserted.
65///
66/// ## Static handlers and services
67///
68/// In addition to dynamic routes, callers can register static HTTP handlers and gRPC services up-front via
69/// [`with_handler`][Self::with_handler], [`with_optional_handler`][Self::with_optional_handler], and
70/// [`with_grpc_service`][Self::with_grpc_service]. These form a base router that's cloned on every rebuild and merged
71/// with the currently asserted dynamic routes. Static routes take precedence on conflicts: a dynamic route whose path
72/// and method overlap with a static route is skipped (with a warning) until the conflict clears.
73///
74/// ## Assertions
75///
76/// - `BoundApiAddress`: the actual listen address bound by the API server. Identifier is `"dynamic-<type>-api"`, where
77///   `type` is the stringified value of `EndpointType::as_str` (for example, `"dynamic-privileged-api"`)
78pub struct DynamicAPIBuilder {
79    endpoint_type: EndpointType,
80    listen_address: ListenAddress,
81    tls_config: Option<ServerConfig>,
82    http_router: Router,
83    grpc_router: RoutesBuilder,
84}
85
86impl DynamicAPIBuilder {
87    /// Creates a new `DynamicAPIBuilder` for the given endpoint type and listen address.
88    pub fn new(endpoint_type: EndpointType, listen_address: ListenAddress) -> Self {
89        Self {
90            endpoint_type,
91            listen_address,
92            tls_config: None,
93            http_router: Router::new(),
94            grpc_router: RoutesBuilder::default(),
95        }
96    }
97
98    /// Adds the given handler as a static HTTP handler.
99    ///
100    /// The handler's initial state and routes are merged into the base router. These routes are always served by the
101    /// API regardless of which dynamic routes are currently asserted.
102    pub fn with_handler<H>(mut self, handler: H) -> Self
103    where
104        H: APIHandler,
105    {
106        let handler_router = handler.generate_routes();
107        let handler_state = handler.generate_initial_state();
108        self.http_router = self.http_router.merge(handler_router.with_state(handler_state));
109        self
110    }
111
112    /// Adds the given optional handler as a static HTTP handler.
113    ///
114    /// If `handler` is `Some`, its initial state and routes are merged into the base router. Otherwise the builder is
115    /// returned unchanged.
116    pub fn with_optional_handler<H>(self, handler: Option<H>) -> Self
117    where
118        H: APIHandler,
119    {
120        if let Some(handler) = handler {
121            self.with_handler(handler)
122        } else {
123            self
124        }
125    }
126
127    /// Adds the given gRPC service as a static service on the base router.
128    pub fn with_grpc_service<S>(mut self, svc: S) -> Self
129    where
130        S: Service<Request<GrpcBody>, Response = Response<GrpcBody>, Error = Infallible>
131            + NamedService
132            + Clone
133            + Send
134            + Sync
135            + 'static,
136        S::Future: Send + 'static,
137        S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
138    {
139        self.grpc_router.add_service(svc);
140        self
141    }
142
143    /// Sets the TLS configuration for the server.
144    pub fn with_tls_config(mut self, config: ServerConfig) -> Self {
145        self.tls_config = Some(config);
146        self
147    }
148
149    /// Sets the TLS configuration for the server based on a dynamically generated, self-signed certificate.
150    pub fn with_self_signed_tls(self) -> Self {
151        let CertifiedKey { cert, signing_key } = generate_simple_self_signed(["localhost".to_owned()]).unwrap();
152        let cert_chain = vec![cert.der().clone()];
153        let key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(signing_key.serialize_der()));
154
155        let config = ServerConfig::builder()
156            .with_no_client_auth()
157            .with_single_cert(cert_chain, key)
158            .unwrap();
159
160        self.with_tls_config(config)
161    }
162}
163
164#[async_trait]
165impl Supervisable for DynamicAPIBuilder {
166    fn name(&self) -> &str {
167        match self.endpoint_type {
168            EndpointType::Unprivileged => "dynamic-unprivileged-api",
169            EndpointType::Privileged => "dynamic-privileged-api",
170        }
171    }
172
173    async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
174        // Build the static base routers.
175        //
176        // We reset the fallback route of the base gRPC router as Tonic's `unimplemented` fallback handle will collide
177        // when merging additional gRPC service routers together. We do this for every gRPC service router that we merge
178        // and then we re-apply a fallback handler that returns a standard gRPC `UNIMPLEMENTED` response when we have
179        // our final, merged gRPC router.
180        let base_http = self.http_router.clone();
181        let base_grpc = self.grpc_router.clone().routes().into_axum_router().reset_fallback();
182
183        // Create dynamic inner routers for both HTTP and gRPC sides, seeded with the static base so that the static
184        // routes are served even before any dynamic routes are asserted. The gRPC seed gets the unimplemented fallback
185        // applied so unmatched gRPC requests return the correct status from the start.
186        let (inner_http, outer_http) = create_dynamic_router(base_http.clone());
187        let (inner_grpc, outer_grpc) = create_dynamic_router(grpc_post_process(base_grpc.clone()));
188
189        let dataspace = DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
190
191        // Subscribe to all dynamic route assertions.
192        let route_assertions = dataspace.subscribe::<DynamicRoute>(IdentifierFilter::All);
193
194        // Bind the HTTP listener immediately so we fail fast on bind errors.
195        let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone())
196            .await
197            .map_err(|e| InitializationError::Failed { source: e.into() })?;
198
199        // Assert the actual bound address so other processes can discover it (e.g. when using port 0).
200        let bound_addr = listener
201            .local_addr()
202            .map_err(|e| InitializationError::Failed { source: e.into() })?;
203        dataspace.assert(BoundApiAddress(bound_addr), Identifier::named(self.name()));
204
205        let multiplexed_service = TowerToHyperService::new(MultiplexService::new(outer_http, outer_grpc));
206
207        let mut http_server = HttpServer::from_listener(listener, multiplexed_service);
208        if let Some(tls_config) = self.tls_config.clone() {
209            http_server = http_server.with_tls_config(tls_config);
210        }
211        let (shutdown_handle, error_handle) = http_server.listen();
212
213        let endpoint_type = self.endpoint_type;
214        let listen_address = self.listen_address.clone();
215
216        Ok(Box::pin(async move {
217            info!("Serving {} API on {}.", endpoint_type.name(), listen_address);
218
219            run_event_loop(
220                inner_http,
221                inner_grpc,
222                base_http,
223                base_grpc,
224                route_assertions,
225                endpoint_type,
226                process_shutdown,
227                shutdown_handle,
228                error_handle,
229            )
230            .await
231        }))
232    }
233}
234
235/// A [`tower::Service`] that routes a request based on a dynamically updated [`Router`].
236///
237/// When installed as the fallback service for a top-level [`Router`], `DynamicRouterService` dynamically routing
238/// requests based on the current defined "inner" router, which itself can be hot-swapped at runtime. This allows for
239/// seamless updates to the API endpoint routing without requiring a restart of the HTTP listener or complicated
240/// configuration changes.
241#[derive(Clone)]
242struct DynamicRouterService {
243    inner_router: Arc<ArcSwap<Router>>,
244}
245
246impl DynamicRouterService {
247    fn from_inner(inner_router: &Arc<ArcSwap<Router>>) -> Self {
248        Self {
249            inner_router: Arc::clone(inner_router),
250        }
251    }
252}
253
254impl Service<http::Request<AxumBody>> for DynamicRouterService {
255    type Response = Response<AxumBody>;
256    type Error = Infallible;
257    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
258
259    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
260        Poll::Ready(Ok(()))
261    }
262
263    fn call(&mut self, request: http::Request<AxumBody>) -> Self::Future {
264        let mut router = Arc::unwrap_or_clone(self.inner_router.load_full());
265        Box::pin(async move { router.call(request).await })
266    }
267}
268
269/// Runs the event loop that listens for route assertions/retractions and hot-swaps the inner routers.
270#[allow(clippy::too_many_arguments)]
271async fn run_event_loop(
272    inner_http: Arc<ArcSwap<Router>>, inner_grpc: Arc<ArcSwap<Router>>, base_http: Router, base_grpc: Router,
273    mut route_assertions: Subscription<DynamicRoute>, endpoint_type: EndpointType,
274    mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, error_handle: ErrorHandle,
275) -> Result<(), GenericError> {
276    let mut http_handlers = FastIndexMap::default();
277    let mut grpc_handlers = FastIndexMap::default();
278
279    let shutdown = process_shutdown.wait_for_shutdown();
280    pin!(shutdown);
281    pin!(error_handle);
282
283    loop {
284        select! {
285            _ = &mut shutdown => {
286                debug!("Dynamic API shutting down.");
287                shutdown_handle.shutdown();
288                break;
289            }
290
291            maybe_err = &mut error_handle => {
292                if let Some(e) = maybe_err {
293                    return Err(GenericError::from(e));
294                }
295                break;
296            }
297
298            maybe_update = route_assertions.recv() => {
299                let Some(update) = maybe_update else {
300                    warn!("Route subscription channel closed.");
301                    break;
302                };
303
304                let mut rebuild_http = false;
305                let mut rebuild_grpc = false;
306
307                match update {
308                    AssertionUpdate::Asserted(id, route) => {
309                        if route.endpoint_type() != endpoint_type {
310                            continue;
311                        }
312
313                        match route.endpoint_protocol() {
314                            EndpointProtocol::Http => {
315                                debug!(?id, "Registering dynamic HTTP handler.");
316                                http_handlers.insert(id, route.into_router());
317
318                                rebuild_http = true;
319                            },
320                            EndpointProtocol::Grpc => {
321                                debug!(?id, "Registering dynamic gRPC handler.");
322                                grpc_handlers.insert(id, route.into_router());
323
324                                rebuild_grpc = true;
325                            },
326                        }
327                    }
328                    AssertionUpdate::Retracted(id) => {
329                        if http_handlers.swap_remove(&id).is_some() {
330                            debug!(?id, "Withdrawing dynamic HTTP handler.");
331                            rebuild_http = true;
332                        }
333
334                        if grpc_handlers.swap_remove(&id).is_some() {
335                            debug!(?id, "Withdrawing dynamic gRPC handler.");
336                            rebuild_grpc = true;
337                        }
338                    }
339                }
340
341                if rebuild_http {
342                    rebuild_router(&inner_http, &base_http, &http_handlers, http_post_process);
343                }
344
345                if rebuild_grpc {
346                    rebuild_router(
347                        &inner_grpc,
348                        &base_grpc,
349                        &grpc_handlers,
350                        grpc_post_process,
351                    );
352                }
353            }
354        }
355    }
356
357    Ok(())
358}
359
360/// Creates a dynamic router pair: a swappable inner router (seeded with `initial`) and an outer router that delegates
361/// to it.
362fn create_dynamic_router(initial: Router) -> (Arc<ArcSwap<Router>>, Router) {
363    let inner = Arc::new(ArcSwap::from_pointee(initial));
364    let outer = Router::new().fallback_service(DynamicRouterService::from_inner(&inner));
365    (inner, outer)
366}
367
368/// Attempts to merge `other` into `base`, returning the merged router on success.
369///
370/// `Router::merge` panics when two routers define overlapping routes (same path and HTTP method) and axum exposes no
371/// fallible alternative. Since `Router` is opaque -- there is no public API to inspect which paths/methods a router
372/// carries -- we can't detect conflicts ahead of time.
373///
374/// To recover from the panic without losing the accumulated router state, we clone `base` before the merge attempt.
375/// The clone is passed into `catch_unwind`: if the merge panics, only the clone is in a partially mutated state and it
376/// is simply dropped. The original `base` remains intact and is returned as-is. `AssertUnwindSafe` is sound here
377/// because:
378///
379/// - The closure captures only the clone (`candidate`) and a clone of `other`. Neither aliases mutable state that
380///   outlives the closure.
381/// - The panic originates from a deterministic format string in axum's `panic_on_err!` macro -- no locks are held and
382///   no resources are leaked in the panic path.
383/// - On panic, `candidate` is dropped without further use, so any internal inconsistency is irrelevant.
384fn try_merge_router(base: &Router, id: &Identifier, other: &Router) -> Result<Router, String> {
385    let candidate = base.clone();
386    match catch_unwind(AssertUnwindSafe(|| candidate.merge(other.clone()))) {
387        Ok(merged) => Ok(merged),
388        Err(payload) => {
389            let reason = payload
390                .downcast_ref::<String>()
391                .map(|s| s.as_str())
392                .or_else(|| payload.downcast_ref::<&str>().copied())
393                .unwrap_or("unknown");
394            Err(format!("failed to merge dynamic handler {id:?}: {reason}"))
395        }
396    }
397}
398
399/// Rebuilds the merged inner router from the static `base` and all currently registered dynamic handlers, applies
400/// `post_process` to the merged router, then stores the result in the [`ArcSwap`].
401fn rebuild_router(
402    inner_router: &Arc<ArcSwap<Router>>, base: &Router, handlers: &FastIndexMap<Identifier, Router>,
403    post_process: fn(Router) -> Router,
404) {
405    let mut merged = base.clone();
406    let mut skipped = 0usize;
407
408    for (id, router) in handlers.iter() {
409        let resetable = router.clone().reset_fallback();
410        match try_merge_router(&merged, id, &resetable) {
411            Ok(new_merged) => merged = new_merged,
412            Err(reason) => {
413                warn!(%reason, "Skipping dynamic handler due to overlapping route.");
414                skipped += 1;
415            }
416        }
417    }
418
419    let merged = post_process(merged);
420    inner_router.store(Arc::new(merged));
421    debug!(handler_count = handlers.len(), skipped, "Rebuilt inner router.");
422}
423
424fn http_post_process(router: Router) -> Router {
425    router
426}
427
428/// Adds a fallback handler that returns a standard gRPC `UNIMPLEMENTED` response when no other handler matches.
429fn grpc_post_process(router: Router) -> Router {
430    router.fallback(grpc_unimplemented)
431}
432
433async fn grpc_unimplemented() -> Response<AxumBody> {
434    tonic::Status::unimplemented("").into_http()
435}
436
437#[cfg(test)]
438mod tests {
439    use std::{net::SocketAddr, time::Duration};
440
441    use async_trait::async_trait;
442    use axum::Router;
443    use http_body_util::{BodyExt as _, Empty};
444    use hyper::{body::Bytes, StatusCode};
445    use hyper_util::{client::legacy::Client, rt::TokioExecutor};
446    use saluki_api::{APIHandler, DynamicRoute, EndpointType};
447    use saluki_core::runtime::{
448        state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter},
449        InitializationError, ProcessShutdown, Supervisable, Supervisor, SupervisorFuture,
450    };
451    use tokio::{
452        sync::{mpsc, oneshot},
453        task::JoinHandle,
454        time::{sleep, timeout, Instant},
455    };
456
457    use super::*;
458
459    struct SimpleHandler {
460        path: &'static str,
461        body: &'static str,
462    }
463
464    impl APIHandler for SimpleHandler {
465        type State = ();
466
467        fn generate_initial_state(&self) -> Self::State {}
468
469        fn generate_routes(&self) -> Router<Self::State> {
470            let body = self.body;
471            Router::new().route(self.path, axum::routing::get(move || async move { body }))
472        }
473    }
474
475    enum RouteCommand {
476        Assert { id: Identifier, route: DynamicRoute },
477        Retract { id: Identifier },
478    }
479
480    struct RouteAsserter {
481        commands_rx: std::sync::Mutex<Option<mpsc::Receiver<RouteCommand>>>,
482        addr_tx: std::sync::Mutex<Option<oneshot::Sender<SocketAddr>>>,
483        endpoint_type: EndpointType,
484    }
485
486    #[async_trait]
487    impl Supervisable for RouteAsserter {
488        fn name(&self) -> &str {
489            "route-asserter"
490        }
491
492        async fn initialize(
493            &self, mut process_shutdown: ProcessShutdown,
494        ) -> Result<SupervisorFuture, InitializationError> {
495            let mut commands_rx =
496                self.commands_rx
497                    .lock()
498                    .unwrap()
499                    .take()
500                    .ok_or_else(|| InitializationError::Failed {
501                        source: generic_error!("RouteAsserter can only be initialized once"),
502                    })?;
503            let addr_tx = self.addr_tx.lock().unwrap().take();
504            let endpoint_type = self.endpoint_type;
505
506            Ok(Box::pin(async move {
507                let dataspace =
508                    DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
509
510                // Wait for the DynamicAPIBuilder to assert its bound address.
511                let bound_addr_name = match endpoint_type {
512                    EndpointType::Unprivileged => "dynamic-unprivileged-api",
513                    EndpointType::Privileged => "dynamic-privileged-api",
514                };
515                let mut addr_sub =
516                    dataspace.subscribe::<BoundApiAddress>(IdentifierFilter::exact(Identifier::named(bound_addr_name)));
517
518                let addr = match addr_sub.recv().await {
519                    Some(AssertionUpdate::Asserted(_, BoundApiAddress(mut addr))) => {
520                        // Convert 0.0.0.0 to 127.0.0.1 so the test client can connect.
521                        if addr.ip().is_unspecified() {
522                            addr.set_ip(std::net::Ipv4Addr::LOCALHOST.into());
523                        }
524                        addr
525                    }
526                    other => return Err(generic_error!("unexpected bound address update: {:?}", other)),
527                };
528
529                if let Some(tx) = addr_tx {
530                    let _ = tx.send(addr);
531                }
532
533                // Process route commands until shutdown.
534                let shutdown = process_shutdown.wait_for_shutdown();
535                tokio::pin!(shutdown);
536
537                loop {
538                    tokio::select! {
539                        _ = &mut shutdown => break,
540                        cmd = commands_rx.recv() => {
541                            let Some(cmd) = cmd else { break };
542                            match cmd {
543                                RouteCommand::Assert { id, route } => {
544                                    dataspace.assert(route, id);
545                                }
546                                RouteCommand::Retract { id } => {
547                                    dataspace.retract::<DynamicRoute>(id);
548                                }
549                            }
550                        }
551                    }
552                }
553
554                Ok(())
555            }))
556        }
557    }
558
559    struct TestHarness {
560        addr: SocketAddr,
561        commands: mpsc::Sender<RouteCommand>,
562        _shutdown: oneshot::Sender<()>,
563        _handle: JoinHandle<()>,
564    }
565
566    impl TestHarness {
567        async fn assert_route(&self, id: impl Into<Identifier>, route: DynamicRoute) {
568            self.commands
569                .send(RouteCommand::Assert { id: id.into(), route })
570                .await
571                .unwrap();
572        }
573
574        async fn retract_route(&self, id: impl Into<Identifier>) {
575            self.commands
576                .send(RouteCommand::Retract { id: id.into() })
577                .await
578                .unwrap();
579        }
580    }
581
582    async fn setup_test_harness(endpoint_type: EndpointType) -> TestHarness {
583        setup_test_harness_with(endpoint_type, |b| b).await
584    }
585
586    async fn setup_test_harness_with<F>(endpoint_type: EndpointType, configure: F) -> TestHarness
587    where
588        F: FnOnce(DynamicAPIBuilder) -> DynamicAPIBuilder,
589    {
590        let (commands_tx, commands_rx) = mpsc::channel(16);
591        let (addr_tx, addr_rx) = oneshot::channel();
592
593        let api_builder = configure(DynamicAPIBuilder::new(endpoint_type, ListenAddress::any_tcp(0)));
594        let route_asserter = RouteAsserter {
595            commands_rx: std::sync::Mutex::new(Some(commands_rx)),
596            addr_tx: std::sync::Mutex::new(Some(addr_tx)),
597            endpoint_type,
598        };
599
600        let mut sup = Supervisor::new("test-dynamic-api").unwrap();
601        sup.add_worker(api_builder);
602        sup.add_worker(route_asserter);
603
604        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
605        let handle = tokio::spawn(async move {
606            let _ = sup.run_with_shutdown(shutdown_rx).await;
607        });
608
609        let addr = timeout(Duration::from_secs(5), addr_rx)
610            .await
611            .expect("timed out waiting for bound address")
612            .expect("addr channel closed");
613
614        TestHarness {
615            addr,
616            commands: commands_tx,
617            _shutdown: shutdown_tx,
618            _handle: handle,
619        }
620    }
621
622    async fn http_get(addr: SocketAddr, path: &str) -> (StatusCode, String) {
623        let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
624        let uri = format!("http://{}{}", addr, path);
625        let resp = client.get(uri.parse().unwrap()).await.unwrap();
626        let status = resp.status();
627        let body = resp.into_body().collect().await.unwrap().to_bytes();
628        let body_str = String::from_utf8_lossy(&body).into_owned();
629        (status, body_str)
630    }
631
632    async fn grpc_post(addr: SocketAddr, path: &str) -> (StatusCode, http::HeaderMap) {
633        let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
634        let uri: hyper::Uri = format!("http://{}{}", addr, path).parse().unwrap();
635        let req = hyper::Request::builder()
636            .uri(uri)
637            .method(hyper::Method::POST)
638            .header(hyper::header::CONTENT_TYPE, "application/grpc")
639            .body(Empty::<Bytes>::new())
640            .unwrap();
641        let resp = client.request(req).await.unwrap();
642        (resp.status(), resp.headers().clone())
643    }
644
645    async fn assert_status_eventually(addr: SocketAddr, path: &str, expected: StatusCode) -> String {
646        let deadline = Instant::now() + Duration::from_secs(2);
647        loop {
648            let (status, body) = http_get(addr, path).await;
649            if status == expected {
650                return body;
651            }
652            if Instant::now() > deadline {
653                panic!("expected {} for {} but got {}", expected, path, status);
654            }
655            sleep(Duration::from_millis(50)).await;
656        }
657    }
658
659    // -- Tests ---------------------------------------------------------------------------
660
661    #[tokio::test]
662    async fn serves_asserted_http_route() {
663        let harness = setup_test_harness(EndpointType::Unprivileged).await;
664
665        let route = DynamicRoute::http(
666            EndpointType::Unprivileged,
667            SimpleHandler {
668                path: "/health",
669                body: "ok",
670            },
671        );
672        harness.assert_route("health", route).await;
673
674        let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
675        assert_eq!(body, "ok");
676    }
677
678    #[tokio::test]
679    async fn returns_404_for_unknown_route() {
680        let harness = setup_test_harness(EndpointType::Unprivileged).await;
681        let (status, _) = http_get(harness.addr, "/nonexistent").await;
682        assert_eq!(status, StatusCode::NOT_FOUND);
683    }
684
685    #[tokio::test]
686    async fn route_retraction_removes_route() {
687        let harness = setup_test_harness(EndpointType::Unprivileged).await;
688
689        let route = DynamicRoute::http(
690            EndpointType::Unprivileged,
691            SimpleHandler {
692                path: "/temp",
693                body: "temporary",
694            },
695        );
696        harness.assert_route("temp", route).await;
697        assert_status_eventually(harness.addr, "/temp", StatusCode::OK).await;
698
699        harness.retract_route("temp").await;
700        assert_status_eventually(harness.addr, "/temp", StatusCode::NOT_FOUND).await;
701    }
702
703    #[tokio::test]
704    async fn multiple_routes_independent_lifecycle() {
705        let harness = setup_test_harness(EndpointType::Unprivileged).await;
706
707        let route_a = DynamicRoute::http(
708            EndpointType::Unprivileged,
709            SimpleHandler {
710                path: "/a",
711                body: "alpha",
712            },
713        );
714        let route_b = DynamicRoute::http(
715            EndpointType::Unprivileged,
716            SimpleHandler {
717                path: "/b",
718                body: "bravo",
719            },
720        );
721        harness.assert_route("a", route_a).await;
722        harness.assert_route("b", route_b).await;
723
724        assert_status_eventually(harness.addr, "/a", StatusCode::OK).await;
725        assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
726
727        // Retract only /a -- /b should remain.
728        harness.retract_route("a").await;
729        assert_status_eventually(harness.addr, "/a", StatusCode::NOT_FOUND).await;
730
731        let body = assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
732        assert_eq!(body, "bravo");
733    }
734
735    #[tokio::test]
736    async fn ignores_routes_for_different_endpoint_type() {
737        let harness = setup_test_harness(EndpointType::Unprivileged).await;
738
739        // Assert a Privileged route on an Unprivileged server -- should be ignored.
740        let wrong_route = DynamicRoute::http(
741            EndpointType::Privileged,
742            SimpleHandler {
743                path: "/secret",
744                body: "secret",
745            },
746        );
747        harness.assert_route("secret", wrong_route).await;
748
749        let (status, _) = http_get(harness.addr, "/secret").await;
750        assert_eq!(status, StatusCode::NOT_FOUND);
751
752        // Now assert the same path with the correct endpoint type.
753        let right_route = DynamicRoute::http(
754            EndpointType::Unprivileged,
755            SimpleHandler {
756                path: "/secret",
757                body: "not secret",
758            },
759        );
760        harness.assert_route("secret-unpriv", right_route).await;
761
762        let body = assert_status_eventually(harness.addr, "/secret", StatusCode::OK).await;
763        assert_eq!(body, "not secret");
764    }
765
766    #[tokio::test]
767    async fn overlapping_routes_do_not_crash_server() {
768        let harness = setup_test_harness(EndpointType::Unprivileged).await;
769
770        // Assert a route at /health with identifier "health-1".
771        let route_1 = DynamicRoute::http(
772            EndpointType::Unprivileged,
773            SimpleHandler {
774                path: "/health",
775                body: "health-1",
776            },
777        );
778        harness.assert_route("health-1", route_1).await;
779        let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
780        assert_eq!(body, "health-1");
781
782        // Assert a DIFFERENT identifier with the SAME path/method. Previously this caused a panic
783        // in rebuild_router. The server should remain alive with first-writer-wins semantics.
784        let route_2 = DynamicRoute::http(
785            EndpointType::Unprivileged,
786            SimpleHandler {
787                path: "/health",
788                body: "health-2",
789            },
790        );
791        harness.assert_route("health-2", route_2).await;
792
793        // Give the event loop time to process and rebuild.
794        sleep(Duration::from_millis(200)).await;
795
796        // Server is still alive; first handler wins.
797        let (status, body) = http_get(harness.addr, "/health").await;
798        assert_eq!(status, StatusCode::OK);
799        assert_eq!(body, "health-1");
800
801        // Non-overlapping routes are unaffected.
802        let route_info = DynamicRoute::http(
803            EndpointType::Unprivileged,
804            SimpleHandler {
805                path: "/info",
806                body: "info",
807            },
808        );
809        harness.assert_route("info", route_info).await;
810        let body = assert_status_eventually(harness.addr, "/info", StatusCode::OK).await;
811        assert_eq!(body, "info");
812
813        // Retract the first /health handler -- the previously skipped second handler should now
814        // become active since the conflict no longer exists.
815        harness.retract_route("health-1").await;
816        let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
817        assert_eq!(body, "health-2");
818    }
819
820    #[tokio::test]
821    async fn overlapping_route_retraction_then_reassertion() {
822        let harness = setup_test_harness(EndpointType::Unprivileged).await;
823
824        // Assert two overlapping handlers.
825        let route_a = DynamicRoute::http(
826            EndpointType::Unprivileged,
827            SimpleHandler {
828                path: "/overlap",
829                body: "a",
830            },
831        );
832        let route_b = DynamicRoute::http(
833            EndpointType::Unprivileged,
834            SimpleHandler {
835                path: "/overlap",
836                body: "b",
837            },
838        );
839        harness.assert_route("ov-a", route_a).await;
840        harness.assert_route("ov-b", route_b).await;
841
842        // Server alive; first writer wins.
843        let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
844        assert_eq!(body, "a");
845
846        // Retract both.
847        harness.retract_route("ov-a").await;
848        harness.retract_route("ov-b").await;
849        assert_status_eventually(harness.addr, "/overlap", StatusCode::NOT_FOUND).await;
850
851        // Re-assert a single handler -- should work cleanly.
852        let route_c = DynamicRoute::http(
853            EndpointType::Unprivileged,
854            SimpleHandler {
855                path: "/overlap",
856                body: "c",
857            },
858        );
859        harness.assert_route("ov-c", route_c).await;
860        let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
861        assert_eq!(body, "c");
862    }
863
864    #[tokio::test]
865    async fn static_handler_served_without_dynamic_routes() {
866        let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
867            b.with_handler(SimpleHandler {
868                path: "/static",
869                body: "static",
870            })
871        })
872        .await;
873
874        let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
875        assert_eq!(body, "static");
876    }
877
878    #[tokio::test]
879    async fn static_and_dynamic_routes_coexist() {
880        let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
881            b.with_handler(SimpleHandler {
882                path: "/static",
883                body: "static",
884            })
885        })
886        .await;
887
888        // Static route is served immediately.
889        let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
890        assert_eq!(body, "static");
891
892        // Add a dynamic route on a different path -- both should serve.
893        let dynamic_route = DynamicRoute::http(
894            EndpointType::Unprivileged,
895            SimpleHandler {
896                path: "/dynamic",
897                body: "dynamic",
898            },
899        );
900        harness.assert_route("dyn", dynamic_route).await;
901
902        let body = assert_status_eventually(harness.addr, "/dynamic", StatusCode::OK).await;
903        assert_eq!(body, "dynamic");
904
905        let (status, body) = http_get(harness.addr, "/static").await;
906        assert_eq!(status, StatusCode::OK);
907        assert_eq!(body, "static");
908
909        // Retracting the dynamic route leaves the static route untouched.
910        harness.retract_route("dyn").await;
911        assert_status_eventually(harness.addr, "/dynamic", StatusCode::NOT_FOUND).await;
912
913        let (status, body) = http_get(harness.addr, "/static").await;
914        assert_eq!(status, StatusCode::OK);
915        assert_eq!(body, "static");
916    }
917
918    #[tokio::test]
919    async fn unknown_grpc_method_returns_unimplemented() {
920        let harness = setup_test_harness(EndpointType::Unprivileged).await;
921        let (status, headers) = grpc_post(harness.addr, "/some.Service/Method").await;
922
923        // gRPC errors are reported with HTTP 200 plus a `grpc-status` header. UNIMPLEMENTED is code 12.
924        assert_eq!(status, StatusCode::OK);
925        let grpc_status = headers.get("grpc-status").and_then(|v| v.to_str().ok());
926        assert_eq!(grpc_status, Some("12"));
927    }
928
929    #[tokio::test]
930    async fn static_route_wins_overlap_with_dynamic() {
931        let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
932            b.with_handler(SimpleHandler {
933                path: "/overlap",
934                body: "static",
935            })
936        })
937        .await;
938
939        // Static route is served.
940        let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
941        assert_eq!(body, "static");
942
943        // Asserting a dynamic route at the same path is skipped due to overlap -- static still wins.
944        let dynamic_route = DynamicRoute::http(
945            EndpointType::Unprivileged,
946            SimpleHandler {
947                path: "/overlap",
948                body: "dynamic",
949            },
950        );
951        harness.assert_route("dyn-overlap", dynamic_route).await;
952
953        sleep(Duration::from_millis(200)).await;
954
955        let (status, body) = http_get(harness.addr, "/overlap").await;
956        assert_eq!(status, StatusCode::OK);
957        assert_eq!(body, "static");
958    }
959}