1use std::{
8 convert::Infallible,
9 error::Error,
10 future::Future,
11 net::SocketAddr,
12 panic::{catch_unwind, AssertUnwindSafe},
13 pin::Pin,
14 sync::Arc,
15 task::{Context, Poll},
16};
17
18use arc_swap::ArcSwap;
19use async_trait::async_trait;
20use axum::{body::Body as AxumBody, Router};
21use http::{Request, Response};
22use rcgen::{generate_simple_self_signed, CertifiedKey};
23use rustls::{pki_types::PrivateKeyDer, ServerConfig};
24use rustls_pki_types::PrivatePkcs8KeyDer;
25use saluki_api::{APIHandler, DynamicRoute, EndpointProtocol, EndpointType};
26use saluki_common::collections::FastIndexMap;
27use saluki_core::runtime::{
28 state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter, Subscription},
29 InitializationError, ProcessShutdown, Supervisable, SupervisorFuture,
30};
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::{
33 listener::ConnectionOrientedListener,
34 server::{
35 http::{ErrorHandle, HttpServer, ShutdownHandle},
36 multiplex_service::MultiplexService,
37 },
38 util::hyper::TowerToHyperService,
39 ListenAddress,
40};
41use tokio::{pin, select};
42use tonic::{body::Body as GrpcBody, server::NamedService, service::RoutesBuilder};
43use tower::Service;
44use tracing::{debug, info, warn};
45
46#[derive(Clone, Debug)]
50pub struct BoundApiAddress(pub SocketAddr);
51
52pub struct DynamicAPIBuilder {
79 endpoint_type: EndpointType,
80 listen_address: ListenAddress,
81 tls_config: Option<ServerConfig>,
82 http_router: Router,
83 grpc_router: RoutesBuilder,
84}
85
86impl DynamicAPIBuilder {
87 pub fn new(endpoint_type: EndpointType, listen_address: ListenAddress) -> Self {
89 Self {
90 endpoint_type,
91 listen_address,
92 tls_config: None,
93 http_router: Router::new(),
94 grpc_router: RoutesBuilder::default(),
95 }
96 }
97
98 pub fn with_handler<H>(mut self, handler: H) -> Self
103 where
104 H: APIHandler,
105 {
106 let handler_router = handler.generate_routes();
107 let handler_state = handler.generate_initial_state();
108 self.http_router = self.http_router.merge(handler_router.with_state(handler_state));
109 self
110 }
111
112 pub fn with_optional_handler<H>(self, handler: Option<H>) -> Self
117 where
118 H: APIHandler,
119 {
120 if let Some(handler) = handler {
121 self.with_handler(handler)
122 } else {
123 self
124 }
125 }
126
127 pub fn with_grpc_service<S>(mut self, svc: S) -> Self
129 where
130 S: Service<Request<GrpcBody>, Response = Response<GrpcBody>, Error = Infallible>
131 + NamedService
132 + Clone
133 + Send
134 + Sync
135 + 'static,
136 S::Future: Send + 'static,
137 S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
138 {
139 self.grpc_router.add_service(svc);
140 self
141 }
142
143 pub fn with_tls_config(mut self, config: ServerConfig) -> Self {
145 self.tls_config = Some(config);
146 self
147 }
148
149 pub fn with_self_signed_tls(self) -> Self {
151 let CertifiedKey { cert, signing_key } = generate_simple_self_signed(["localhost".to_owned()]).unwrap();
152 let cert_chain = vec![cert.der().clone()];
153 let key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(signing_key.serialize_der()));
154
155 let config = ServerConfig::builder()
156 .with_no_client_auth()
157 .with_single_cert(cert_chain, key)
158 .unwrap();
159
160 self.with_tls_config(config)
161 }
162}
163
164#[async_trait]
165impl Supervisable for DynamicAPIBuilder {
166 fn name(&self) -> &str {
167 match self.endpoint_type {
168 EndpointType::Unprivileged => "dynamic-unprivileged-api",
169 EndpointType::Privileged => "dynamic-privileged-api",
170 }
171 }
172
173 async fn initialize(&self, process_shutdown: ProcessShutdown) -> Result<SupervisorFuture, InitializationError> {
174 let base_http = self.http_router.clone();
181 let base_grpc = self.grpc_router.clone().routes().into_axum_router().reset_fallback();
182
183 let (inner_http, outer_http) = create_dynamic_router(base_http.clone());
187 let (inner_grpc, outer_grpc) = create_dynamic_router(grpc_post_process(base_grpc.clone()));
188
189 let dataspace = DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
190
191 let route_assertions = dataspace.subscribe::<DynamicRoute>(IdentifierFilter::All);
193
194 let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone())
196 .await
197 .map_err(|e| InitializationError::Failed { source: e.into() })?;
198
199 let bound_addr = listener
201 .local_addr()
202 .map_err(|e| InitializationError::Failed { source: e.into() })?;
203 dataspace.assert(BoundApiAddress(bound_addr), Identifier::named(self.name()));
204
205 let multiplexed_service = TowerToHyperService::new(MultiplexService::new(outer_http, outer_grpc));
206
207 let mut http_server = HttpServer::from_listener(listener, multiplexed_service);
208 if let Some(tls_config) = self.tls_config.clone() {
209 http_server = http_server.with_tls_config(tls_config);
210 }
211 let (shutdown_handle, error_handle) = http_server.listen();
212
213 let endpoint_type = self.endpoint_type;
214 let listen_address = self.listen_address.clone();
215
216 Ok(Box::pin(async move {
217 info!("Serving {} API on {}.", endpoint_type.name(), listen_address);
218
219 run_event_loop(
220 inner_http,
221 inner_grpc,
222 base_http,
223 base_grpc,
224 route_assertions,
225 endpoint_type,
226 process_shutdown,
227 shutdown_handle,
228 error_handle,
229 )
230 .await
231 }))
232 }
233}
234
235#[derive(Clone)]
242struct DynamicRouterService {
243 inner_router: Arc<ArcSwap<Router>>,
244}
245
246impl DynamicRouterService {
247 fn from_inner(inner_router: &Arc<ArcSwap<Router>>) -> Self {
248 Self {
249 inner_router: Arc::clone(inner_router),
250 }
251 }
252}
253
254impl Service<http::Request<AxumBody>> for DynamicRouterService {
255 type Response = Response<AxumBody>;
256 type Error = Infallible;
257 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
258
259 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
260 Poll::Ready(Ok(()))
261 }
262
263 fn call(&mut self, request: http::Request<AxumBody>) -> Self::Future {
264 let mut router = Arc::unwrap_or_clone(self.inner_router.load_full());
265 Box::pin(async move { router.call(request).await })
266 }
267}
268
269#[allow(clippy::too_many_arguments)]
271async fn run_event_loop(
272 inner_http: Arc<ArcSwap<Router>>, inner_grpc: Arc<ArcSwap<Router>>, base_http: Router, base_grpc: Router,
273 mut route_assertions: Subscription<DynamicRoute>, endpoint_type: EndpointType,
274 mut process_shutdown: ProcessShutdown, shutdown_handle: ShutdownHandle, error_handle: ErrorHandle,
275) -> Result<(), GenericError> {
276 let mut http_handlers = FastIndexMap::default();
277 let mut grpc_handlers = FastIndexMap::default();
278
279 let shutdown = process_shutdown.wait_for_shutdown();
280 pin!(shutdown);
281 pin!(error_handle);
282
283 loop {
284 select! {
285 _ = &mut shutdown => {
286 debug!("Dynamic API shutting down.");
287 shutdown_handle.shutdown();
288 break;
289 }
290
291 maybe_err = &mut error_handle => {
292 if let Some(e) = maybe_err {
293 return Err(GenericError::from(e));
294 }
295 break;
296 }
297
298 maybe_update = route_assertions.recv() => {
299 let Some(update) = maybe_update else {
300 warn!("Route subscription channel closed.");
301 break;
302 };
303
304 let mut rebuild_http = false;
305 let mut rebuild_grpc = false;
306
307 match update {
308 AssertionUpdate::Asserted(id, route) => {
309 if route.endpoint_type() != endpoint_type {
310 continue;
311 }
312
313 match route.endpoint_protocol() {
314 EndpointProtocol::Http => {
315 debug!(?id, "Registering dynamic HTTP handler.");
316 http_handlers.insert(id, route.into_router());
317
318 rebuild_http = true;
319 },
320 EndpointProtocol::Grpc => {
321 debug!(?id, "Registering dynamic gRPC handler.");
322 grpc_handlers.insert(id, route.into_router());
323
324 rebuild_grpc = true;
325 },
326 }
327 }
328 AssertionUpdate::Retracted(id) => {
329 if http_handlers.swap_remove(&id).is_some() {
330 debug!(?id, "Withdrawing dynamic HTTP handler.");
331 rebuild_http = true;
332 }
333
334 if grpc_handlers.swap_remove(&id).is_some() {
335 debug!(?id, "Withdrawing dynamic gRPC handler.");
336 rebuild_grpc = true;
337 }
338 }
339 }
340
341 if rebuild_http {
342 rebuild_router(&inner_http, &base_http, &http_handlers, http_post_process);
343 }
344
345 if rebuild_grpc {
346 rebuild_router(
347 &inner_grpc,
348 &base_grpc,
349 &grpc_handlers,
350 grpc_post_process,
351 );
352 }
353 }
354 }
355 }
356
357 Ok(())
358}
359
360fn create_dynamic_router(initial: Router) -> (Arc<ArcSwap<Router>>, Router) {
363 let inner = Arc::new(ArcSwap::from_pointee(initial));
364 let outer = Router::new().fallback_service(DynamicRouterService::from_inner(&inner));
365 (inner, outer)
366}
367
368fn try_merge_router(base: &Router, id: &Identifier, other: &Router) -> Result<Router, String> {
385 let candidate = base.clone();
386 match catch_unwind(AssertUnwindSafe(|| candidate.merge(other.clone()))) {
387 Ok(merged) => Ok(merged),
388 Err(payload) => {
389 let reason = payload
390 .downcast_ref::<String>()
391 .map(|s| s.as_str())
392 .or_else(|| payload.downcast_ref::<&str>().copied())
393 .unwrap_or("unknown");
394 Err(format!("failed to merge dynamic handler {id:?}: {reason}"))
395 }
396 }
397}
398
399fn rebuild_router(
402 inner_router: &Arc<ArcSwap<Router>>, base: &Router, handlers: &FastIndexMap<Identifier, Router>,
403 post_process: fn(Router) -> Router,
404) {
405 let mut merged = base.clone();
406 let mut skipped = 0usize;
407
408 for (id, router) in handlers.iter() {
409 let resetable = router.clone().reset_fallback();
410 match try_merge_router(&merged, id, &resetable) {
411 Ok(new_merged) => merged = new_merged,
412 Err(reason) => {
413 warn!(%reason, "Skipping dynamic handler due to overlapping route.");
414 skipped += 1;
415 }
416 }
417 }
418
419 let merged = post_process(merged);
420 inner_router.store(Arc::new(merged));
421 debug!(handler_count = handlers.len(), skipped, "Rebuilt inner router.");
422}
423
424fn http_post_process(router: Router) -> Router {
425 router
426}
427
428fn grpc_post_process(router: Router) -> Router {
430 router.fallback(grpc_unimplemented)
431}
432
433async fn grpc_unimplemented() -> Response<AxumBody> {
434 tonic::Status::unimplemented("").into_http()
435}
436
437#[cfg(test)]
438mod tests {
439 use std::{net::SocketAddr, time::Duration};
440
441 use async_trait::async_trait;
442 use axum::Router;
443 use http_body_util::{BodyExt as _, Empty};
444 use hyper::{body::Bytes, StatusCode};
445 use hyper_util::{client::legacy::Client, rt::TokioExecutor};
446 use saluki_api::{APIHandler, DynamicRoute, EndpointType};
447 use saluki_core::runtime::{
448 state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter},
449 InitializationError, ProcessShutdown, Supervisable, Supervisor, SupervisorFuture,
450 };
451 use tokio::{
452 sync::{mpsc, oneshot},
453 task::JoinHandle,
454 time::{sleep, timeout, Instant},
455 };
456
457 use super::*;
458
459 struct SimpleHandler {
460 path: &'static str,
461 body: &'static str,
462 }
463
464 impl APIHandler for SimpleHandler {
465 type State = ();
466
467 fn generate_initial_state(&self) -> Self::State {}
468
469 fn generate_routes(&self) -> Router<Self::State> {
470 let body = self.body;
471 Router::new().route(self.path, axum::routing::get(move || async move { body }))
472 }
473 }
474
475 enum RouteCommand {
476 Assert { id: Identifier, route: DynamicRoute },
477 Retract { id: Identifier },
478 }
479
480 struct RouteAsserter {
481 commands_rx: std::sync::Mutex<Option<mpsc::Receiver<RouteCommand>>>,
482 addr_tx: std::sync::Mutex<Option<oneshot::Sender<SocketAddr>>>,
483 endpoint_type: EndpointType,
484 }
485
486 #[async_trait]
487 impl Supervisable for RouteAsserter {
488 fn name(&self) -> &str {
489 "route-asserter"
490 }
491
492 async fn initialize(
493 &self, mut process_shutdown: ProcessShutdown,
494 ) -> Result<SupervisorFuture, InitializationError> {
495 let mut commands_rx =
496 self.commands_rx
497 .lock()
498 .unwrap()
499 .take()
500 .ok_or_else(|| InitializationError::Failed {
501 source: generic_error!("RouteAsserter can only be initialized once"),
502 })?;
503 let addr_tx = self.addr_tx.lock().unwrap().take();
504 let endpoint_type = self.endpoint_type;
505
506 Ok(Box::pin(async move {
507 let dataspace =
508 DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
509
510 let bound_addr_name = match endpoint_type {
512 EndpointType::Unprivileged => "dynamic-unprivileged-api",
513 EndpointType::Privileged => "dynamic-privileged-api",
514 };
515 let mut addr_sub =
516 dataspace.subscribe::<BoundApiAddress>(IdentifierFilter::exact(Identifier::named(bound_addr_name)));
517
518 let addr = match addr_sub.recv().await {
519 Some(AssertionUpdate::Asserted(_, BoundApiAddress(mut addr))) => {
520 if addr.ip().is_unspecified() {
522 addr.set_ip(std::net::Ipv4Addr::LOCALHOST.into());
523 }
524 addr
525 }
526 other => return Err(generic_error!("unexpected bound address update: {:?}", other)),
527 };
528
529 if let Some(tx) = addr_tx {
530 let _ = tx.send(addr);
531 }
532
533 let shutdown = process_shutdown.wait_for_shutdown();
535 tokio::pin!(shutdown);
536
537 loop {
538 tokio::select! {
539 _ = &mut shutdown => break,
540 cmd = commands_rx.recv() => {
541 let Some(cmd) = cmd else { break };
542 match cmd {
543 RouteCommand::Assert { id, route } => {
544 dataspace.assert(route, id);
545 }
546 RouteCommand::Retract { id } => {
547 dataspace.retract::<DynamicRoute>(id);
548 }
549 }
550 }
551 }
552 }
553
554 Ok(())
555 }))
556 }
557 }
558
559 struct TestHarness {
560 addr: SocketAddr,
561 commands: mpsc::Sender<RouteCommand>,
562 _shutdown: oneshot::Sender<()>,
563 _handle: JoinHandle<()>,
564 }
565
566 impl TestHarness {
567 async fn assert_route(&self, id: impl Into<Identifier>, route: DynamicRoute) {
568 self.commands
569 .send(RouteCommand::Assert { id: id.into(), route })
570 .await
571 .unwrap();
572 }
573
574 async fn retract_route(&self, id: impl Into<Identifier>) {
575 self.commands
576 .send(RouteCommand::Retract { id: id.into() })
577 .await
578 .unwrap();
579 }
580 }
581
582 async fn setup_test_harness(endpoint_type: EndpointType) -> TestHarness {
583 setup_test_harness_with(endpoint_type, |b| b).await
584 }
585
586 async fn setup_test_harness_with<F>(endpoint_type: EndpointType, configure: F) -> TestHarness
587 where
588 F: FnOnce(DynamicAPIBuilder) -> DynamicAPIBuilder,
589 {
590 let (commands_tx, commands_rx) = mpsc::channel(16);
591 let (addr_tx, addr_rx) = oneshot::channel();
592
593 let api_builder = configure(DynamicAPIBuilder::new(endpoint_type, ListenAddress::any_tcp(0)));
594 let route_asserter = RouteAsserter {
595 commands_rx: std::sync::Mutex::new(Some(commands_rx)),
596 addr_tx: std::sync::Mutex::new(Some(addr_tx)),
597 endpoint_type,
598 };
599
600 let mut sup = Supervisor::new("test-dynamic-api").unwrap();
601 sup.add_worker(api_builder);
602 sup.add_worker(route_asserter);
603
604 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
605 let handle = tokio::spawn(async move {
606 let _ = sup.run_with_shutdown(shutdown_rx).await;
607 });
608
609 let addr = timeout(Duration::from_secs(5), addr_rx)
610 .await
611 .expect("timed out waiting for bound address")
612 .expect("addr channel closed");
613
614 TestHarness {
615 addr,
616 commands: commands_tx,
617 _shutdown: shutdown_tx,
618 _handle: handle,
619 }
620 }
621
622 async fn http_get(addr: SocketAddr, path: &str) -> (StatusCode, String) {
623 let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
624 let uri = format!("http://{}{}", addr, path);
625 let resp = client.get(uri.parse().unwrap()).await.unwrap();
626 let status = resp.status();
627 let body = resp.into_body().collect().await.unwrap().to_bytes();
628 let body_str = String::from_utf8_lossy(&body).into_owned();
629 (status, body_str)
630 }
631
632 async fn grpc_post(addr: SocketAddr, path: &str) -> (StatusCode, http::HeaderMap) {
633 let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
634 let uri: hyper::Uri = format!("http://{}{}", addr, path).parse().unwrap();
635 let req = hyper::Request::builder()
636 .uri(uri)
637 .method(hyper::Method::POST)
638 .header(hyper::header::CONTENT_TYPE, "application/grpc")
639 .body(Empty::<Bytes>::new())
640 .unwrap();
641 let resp = client.request(req).await.unwrap();
642 (resp.status(), resp.headers().clone())
643 }
644
645 async fn assert_status_eventually(addr: SocketAddr, path: &str, expected: StatusCode) -> String {
646 let deadline = Instant::now() + Duration::from_secs(2);
647 loop {
648 let (status, body) = http_get(addr, path).await;
649 if status == expected {
650 return body;
651 }
652 if Instant::now() > deadline {
653 panic!("expected {} for {} but got {}", expected, path, status);
654 }
655 sleep(Duration::from_millis(50)).await;
656 }
657 }
658
659 #[tokio::test]
662 async fn serves_asserted_http_route() {
663 let harness = setup_test_harness(EndpointType::Unprivileged).await;
664
665 let route = DynamicRoute::http(
666 EndpointType::Unprivileged,
667 SimpleHandler {
668 path: "/health",
669 body: "ok",
670 },
671 );
672 harness.assert_route("health", route).await;
673
674 let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
675 assert_eq!(body, "ok");
676 }
677
678 #[tokio::test]
679 async fn returns_404_for_unknown_route() {
680 let harness = setup_test_harness(EndpointType::Unprivileged).await;
681 let (status, _) = http_get(harness.addr, "/nonexistent").await;
682 assert_eq!(status, StatusCode::NOT_FOUND);
683 }
684
685 #[tokio::test]
686 async fn route_retraction_removes_route() {
687 let harness = setup_test_harness(EndpointType::Unprivileged).await;
688
689 let route = DynamicRoute::http(
690 EndpointType::Unprivileged,
691 SimpleHandler {
692 path: "/temp",
693 body: "temporary",
694 },
695 );
696 harness.assert_route("temp", route).await;
697 assert_status_eventually(harness.addr, "/temp", StatusCode::OK).await;
698
699 harness.retract_route("temp").await;
700 assert_status_eventually(harness.addr, "/temp", StatusCode::NOT_FOUND).await;
701 }
702
703 #[tokio::test]
704 async fn multiple_routes_independent_lifecycle() {
705 let harness = setup_test_harness(EndpointType::Unprivileged).await;
706
707 let route_a = DynamicRoute::http(
708 EndpointType::Unprivileged,
709 SimpleHandler {
710 path: "/a",
711 body: "alpha",
712 },
713 );
714 let route_b = DynamicRoute::http(
715 EndpointType::Unprivileged,
716 SimpleHandler {
717 path: "/b",
718 body: "bravo",
719 },
720 );
721 harness.assert_route("a", route_a).await;
722 harness.assert_route("b", route_b).await;
723
724 assert_status_eventually(harness.addr, "/a", StatusCode::OK).await;
725 assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
726
727 harness.retract_route("a").await;
729 assert_status_eventually(harness.addr, "/a", StatusCode::NOT_FOUND).await;
730
731 let body = assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
732 assert_eq!(body, "bravo");
733 }
734
735 #[tokio::test]
736 async fn ignores_routes_for_different_endpoint_type() {
737 let harness = setup_test_harness(EndpointType::Unprivileged).await;
738
739 let wrong_route = DynamicRoute::http(
741 EndpointType::Privileged,
742 SimpleHandler {
743 path: "/secret",
744 body: "secret",
745 },
746 );
747 harness.assert_route("secret", wrong_route).await;
748
749 let (status, _) = http_get(harness.addr, "/secret").await;
750 assert_eq!(status, StatusCode::NOT_FOUND);
751
752 let right_route = DynamicRoute::http(
754 EndpointType::Unprivileged,
755 SimpleHandler {
756 path: "/secret",
757 body: "not secret",
758 },
759 );
760 harness.assert_route("secret-unpriv", right_route).await;
761
762 let body = assert_status_eventually(harness.addr, "/secret", StatusCode::OK).await;
763 assert_eq!(body, "not secret");
764 }
765
766 #[tokio::test]
767 async fn overlapping_routes_do_not_crash_server() {
768 let harness = setup_test_harness(EndpointType::Unprivileged).await;
769
770 let route_1 = DynamicRoute::http(
772 EndpointType::Unprivileged,
773 SimpleHandler {
774 path: "/health",
775 body: "health-1",
776 },
777 );
778 harness.assert_route("health-1", route_1).await;
779 let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
780 assert_eq!(body, "health-1");
781
782 let route_2 = DynamicRoute::http(
785 EndpointType::Unprivileged,
786 SimpleHandler {
787 path: "/health",
788 body: "health-2",
789 },
790 );
791 harness.assert_route("health-2", route_2).await;
792
793 sleep(Duration::from_millis(200)).await;
795
796 let (status, body) = http_get(harness.addr, "/health").await;
798 assert_eq!(status, StatusCode::OK);
799 assert_eq!(body, "health-1");
800
801 let route_info = DynamicRoute::http(
803 EndpointType::Unprivileged,
804 SimpleHandler {
805 path: "/info",
806 body: "info",
807 },
808 );
809 harness.assert_route("info", route_info).await;
810 let body = assert_status_eventually(harness.addr, "/info", StatusCode::OK).await;
811 assert_eq!(body, "info");
812
813 harness.retract_route("health-1").await;
816 let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
817 assert_eq!(body, "health-2");
818 }
819
820 #[tokio::test]
821 async fn overlapping_route_retraction_then_reassertion() {
822 let harness = setup_test_harness(EndpointType::Unprivileged).await;
823
824 let route_a = DynamicRoute::http(
826 EndpointType::Unprivileged,
827 SimpleHandler {
828 path: "/overlap",
829 body: "a",
830 },
831 );
832 let route_b = DynamicRoute::http(
833 EndpointType::Unprivileged,
834 SimpleHandler {
835 path: "/overlap",
836 body: "b",
837 },
838 );
839 harness.assert_route("ov-a", route_a).await;
840 harness.assert_route("ov-b", route_b).await;
841
842 let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
844 assert_eq!(body, "a");
845
846 harness.retract_route("ov-a").await;
848 harness.retract_route("ov-b").await;
849 assert_status_eventually(harness.addr, "/overlap", StatusCode::NOT_FOUND).await;
850
851 let route_c = DynamicRoute::http(
853 EndpointType::Unprivileged,
854 SimpleHandler {
855 path: "/overlap",
856 body: "c",
857 },
858 );
859 harness.assert_route("ov-c", route_c).await;
860 let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
861 assert_eq!(body, "c");
862 }
863
864 #[tokio::test]
865 async fn static_handler_served_without_dynamic_routes() {
866 let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
867 b.with_handler(SimpleHandler {
868 path: "/static",
869 body: "static",
870 })
871 })
872 .await;
873
874 let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
875 assert_eq!(body, "static");
876 }
877
878 #[tokio::test]
879 async fn static_and_dynamic_routes_coexist() {
880 let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
881 b.with_handler(SimpleHandler {
882 path: "/static",
883 body: "static",
884 })
885 })
886 .await;
887
888 let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
890 assert_eq!(body, "static");
891
892 let dynamic_route = DynamicRoute::http(
894 EndpointType::Unprivileged,
895 SimpleHandler {
896 path: "/dynamic",
897 body: "dynamic",
898 },
899 );
900 harness.assert_route("dyn", dynamic_route).await;
901
902 let body = assert_status_eventually(harness.addr, "/dynamic", StatusCode::OK).await;
903 assert_eq!(body, "dynamic");
904
905 let (status, body) = http_get(harness.addr, "/static").await;
906 assert_eq!(status, StatusCode::OK);
907 assert_eq!(body, "static");
908
909 harness.retract_route("dyn").await;
911 assert_status_eventually(harness.addr, "/dynamic", StatusCode::NOT_FOUND).await;
912
913 let (status, body) = http_get(harness.addr, "/static").await;
914 assert_eq!(status, StatusCode::OK);
915 assert_eq!(body, "static");
916 }
917
918 #[tokio::test]
919 async fn unknown_grpc_method_returns_unimplemented() {
920 let harness = setup_test_harness(EndpointType::Unprivileged).await;
921 let (status, headers) = grpc_post(harness.addr, "/some.Service/Method").await;
922
923 assert_eq!(status, StatusCode::OK);
925 let grpc_status = headers.get("grpc-status").and_then(|v| v.to_str().ok());
926 assert_eq!(grpc_status, Some("12"));
927 }
928
929 #[tokio::test]
930 async fn static_route_wins_overlap_with_dynamic() {
931 let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
932 b.with_handler(SimpleHandler {
933 path: "/overlap",
934 body: "static",
935 })
936 })
937 .await;
938
939 let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
941 assert_eq!(body, "static");
942
943 let dynamic_route = DynamicRoute::http(
945 EndpointType::Unprivileged,
946 SimpleHandler {
947 path: "/overlap",
948 body: "dynamic",
949 },
950 );
951 harness.assert_route("dyn-overlap", dynamic_route).await;
952
953 sleep(Duration::from_millis(200)).await;
954
955 let (status, body) = http_get(harness.addr, "/overlap").await;
956 assert_eq!(status, StatusCode::OK);
957 assert_eq!(body, "static");
958 }
959}