1use std::{
8 convert::Infallible,
9 error::Error,
10 future::Future,
11 net::SocketAddr,
12 panic::{catch_unwind, AssertUnwindSafe},
13 pin::Pin,
14 sync::Arc,
15 task::{Context, Poll},
16};
17
18use arc_swap::ArcSwap;
19use async_trait::async_trait;
20use axum::{body::Body as AxumBody, Router};
21use http::{Request, Response};
22use rcgen::{generate_simple_self_signed, CertifiedKey};
23use rustls::{pki_types::PrivateKeyDer, ServerConfig};
24use rustls_pki_types::PrivatePkcs8KeyDer;
25use saluki_api::{APIHandler, DynamicRoute, EndpointProtocol, EndpointType};
26use saluki_common::{collections::FastIndexMap, sync::shutdown::ShutdownHandle};
27use saluki_core::runtime::{
28 state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter, Subscription},
29 InitializationError, Supervisable, SupervisorFuture,
30};
31use saluki_error::{generic_error, GenericError};
32use saluki_io::net::{
33 listener::ConnectionOrientedListener,
34 server::{http::HttpServer, multiplex_service::MultiplexService},
35 util::hyper::TowerToHyperService,
36 ListenAddress,
37};
38use tokio::select;
39use tonic::{body::Body as GrpcBody, server::NamedService, service::RoutesBuilder};
40use tower::Service;
41use tracing::{debug, info, warn};
42
43#[derive(Clone, Debug)]
47pub struct BoundApiAddress(pub SocketAddr);
48
49pub struct DynamicAPIBuilder {
76 endpoint_type: EndpointType,
77 listen_address: ListenAddress,
78 tls_config: Option<ServerConfig>,
79 http_router: Router,
80 grpc_router: RoutesBuilder,
81}
82
83impl DynamicAPIBuilder {
84 pub fn new(endpoint_type: EndpointType, listen_address: ListenAddress) -> Self {
86 Self {
87 endpoint_type,
88 listen_address,
89 tls_config: None,
90 http_router: Router::new(),
91 grpc_router: RoutesBuilder::default(),
92 }
93 }
94
95 pub fn with_handler<H>(mut self, handler: H) -> Self
100 where
101 H: APIHandler,
102 {
103 let handler_router = handler.generate_routes();
104 let handler_state = handler.generate_initial_state();
105 self.http_router = self.http_router.merge(handler_router.with_state(handler_state));
106 self
107 }
108
109 pub fn with_optional_handler<H>(self, handler: Option<H>) -> Self
114 where
115 H: APIHandler,
116 {
117 if let Some(handler) = handler {
118 self.with_handler(handler)
119 } else {
120 self
121 }
122 }
123
124 pub fn with_grpc_service<S>(mut self, svc: S) -> Self
126 where
127 S: Service<Request<GrpcBody>, Response = Response<GrpcBody>, Error = Infallible>
128 + NamedService
129 + Clone
130 + Send
131 + Sync
132 + 'static,
133 S::Future: Send + 'static,
134 S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
135 {
136 self.grpc_router.add_service(svc);
137 self
138 }
139
140 pub fn with_tls_config(mut self, config: ServerConfig) -> Self {
142 self.tls_config = Some(config);
143 self
144 }
145
146 pub fn with_self_signed_tls(self) -> Self {
148 let CertifiedKey { cert, signing_key } = generate_simple_self_signed(["localhost".to_owned()]).unwrap();
149 let cert_chain = vec![cert.der().clone()];
150 let key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(signing_key.serialize_der()));
151
152 let config = ServerConfig::builder()
153 .with_no_client_auth()
154 .with_single_cert(cert_chain, key)
155 .unwrap();
156
157 self.with_tls_config(config)
158 }
159}
160
161#[async_trait]
162impl Supervisable for DynamicAPIBuilder {
163 fn name(&self) -> &str {
164 match self.endpoint_type {
165 EndpointType::Unprivileged => "dynamic-unprivileged-api",
166 EndpointType::Privileged => "dynamic-privileged-api",
167 }
168 }
169
170 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
171 let base_http = self.http_router.clone();
178 let base_grpc = self.grpc_router.clone().routes().into_axum_router().reset_fallback();
179
180 let (inner_http, outer_http) = create_dynamic_router(base_http.clone());
184 let (inner_grpc, outer_grpc) = create_dynamic_router(grpc_post_process(base_grpc.clone()));
185
186 let dataspace = DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
187
188 let listener = ConnectionOrientedListener::from_listen_address(self.listen_address.clone())
190 .await
191 .map_err(|e| InitializationError::Failed { source: e.into() })?;
192
193 let bound_addr = listener
198 .local_addr()
199 .map_err(|e| InitializationError::Failed { source: e.into() })?;
200 dataspace.assert(BoundApiAddress(bound_addr), Identifier::named(self.name()));
201
202 let multiplexed_service = TowerToHyperService::new(MultiplexService::new(outer_http, outer_grpc));
203
204 let mut http_server = HttpServer::from_listener(listener, multiplexed_service);
205 if let Some(tls_config) = self.tls_config.clone() {
206 http_server = http_server.with_tls_config(tls_config);
207 }
208 let (server_shutdown_coordinator, error_handle) = http_server.listen();
209
210 let endpoint_type = self.endpoint_type;
211 let listen_address = self.listen_address.clone();
212
213 Ok(Box::pin(async move {
214 info!("Serving {} API on {}.", endpoint_type.name(), listen_address);
215
216 let route_assertions = dataspace.subscribe::<DynamicRoute>(IdentifierFilter::All);
218
219 select! {
220 _ = process_shutdown => {
221 debug!(endpoint_type = endpoint_type.name(), "Triggering shutdown of dynamic API endpoint.");
223
224 server_shutdown_coordinator.shutdown_and_wait().await;
225
226 Ok(())
227 },
228 maybe_err = error_handle => match maybe_err {
229 Some(e) => Err(GenericError::from(e)),
230 None => Ok(()),
231 },
232 result = run_event_loop(inner_http, inner_grpc, base_http, base_grpc, route_assertions, endpoint_type) => result,
233 }
234 }))
235 }
236}
237
238#[derive(Clone)]
245struct DynamicRouterService {
246 inner_router: Arc<ArcSwap<Router>>,
247}
248
249impl DynamicRouterService {
250 fn from_inner(inner_router: &Arc<ArcSwap<Router>>) -> Self {
251 Self {
252 inner_router: Arc::clone(inner_router),
253 }
254 }
255}
256
257impl Service<http::Request<AxumBody>> for DynamicRouterService {
258 type Response = Response<AxumBody>;
259 type Error = Infallible;
260 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
261
262 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263 Poll::Ready(Ok(()))
264 }
265
266 fn call(&mut self, request: http::Request<AxumBody>) -> Self::Future {
267 let mut router = Arc::unwrap_or_clone(self.inner_router.load_full());
268 Box::pin(async move { router.call(request).await })
269 }
270}
271
272#[allow(clippy::too_many_arguments)]
274async fn run_event_loop(
275 inner_http: Arc<ArcSwap<Router>>, inner_grpc: Arc<ArcSwap<Router>>, base_http: Router, base_grpc: Router,
276 mut route_assertions: Subscription<DynamicRoute>, endpoint_type: EndpointType,
277) -> Result<(), GenericError> {
278 let mut http_handlers = FastIndexMap::default();
279 let mut grpc_handlers = FastIndexMap::default();
280
281 while let Some(update) = route_assertions.recv().await {
282 let mut rebuild_http = false;
283 let mut rebuild_grpc = false;
284
285 match update {
286 AssertionUpdate::Asserted(id, route) => {
287 if route.endpoint_type() != endpoint_type {
288 continue;
289 }
290
291 match route.endpoint_protocol() {
292 EndpointProtocol::Http => {
293 debug!(?id, "Registering dynamic HTTP handler.");
294 http_handlers.insert(id, route.into_router());
295
296 rebuild_http = true;
297 }
298 EndpointProtocol::Grpc => {
299 debug!(?id, "Registering dynamic gRPC handler.");
300 grpc_handlers.insert(id, route.into_router());
301
302 rebuild_grpc = true;
303 }
304 }
305 }
306 AssertionUpdate::Retracted(id) => {
307 if http_handlers.swap_remove(&id).is_some() {
308 debug!(?id, "Withdrawing dynamic HTTP handler.");
309 rebuild_http = true;
310 }
311
312 if grpc_handlers.swap_remove(&id).is_some() {
313 debug!(?id, "Withdrawing dynamic gRPC handler.");
314 rebuild_grpc = true;
315 }
316 }
317 }
318
319 if rebuild_http {
320 rebuild_router(&inner_http, &base_http, &http_handlers, http_post_process);
321 }
322
323 if rebuild_grpc {
324 rebuild_router(&inner_grpc, &base_grpc, &grpc_handlers, grpc_post_process);
325 }
326 }
327
328 Ok(())
329}
330
331fn create_dynamic_router(initial: Router) -> (Arc<ArcSwap<Router>>, Router) {
334 let inner = Arc::new(ArcSwap::from_pointee(initial));
335 let outer = Router::new().fallback_service(DynamicRouterService::from_inner(&inner));
336 (inner, outer)
337}
338
339fn try_merge_router(base: &Router, id: &Identifier, other: &Router) -> Result<Router, String> {
356 let candidate = base.clone();
357 match catch_unwind(AssertUnwindSafe(|| candidate.merge(other.clone()))) {
358 Ok(merged) => Ok(merged),
359 Err(payload) => {
360 let reason = payload
361 .downcast_ref::<String>()
362 .map(|s| s.as_str())
363 .or_else(|| payload.downcast_ref::<&str>().copied())
364 .unwrap_or("unknown");
365 Err(format!("failed to merge dynamic handler {id:?}: {reason}"))
366 }
367 }
368}
369
370fn rebuild_router(
373 inner_router: &Arc<ArcSwap<Router>>, base: &Router, handlers: &FastIndexMap<Identifier, Router>,
374 post_process: fn(Router) -> Router,
375) {
376 let mut merged = base.clone();
377 let mut skipped = 0usize;
378
379 for (id, router) in handlers.iter() {
380 let resetable = router.clone().reset_fallback();
381 match try_merge_router(&merged, id, &resetable) {
382 Ok(new_merged) => merged = new_merged,
383 Err(reason) => {
384 warn!(%reason, "Skipping dynamic handler due to overlapping route.");
385 skipped += 1;
386 }
387 }
388 }
389
390 let merged = post_process(merged);
391 inner_router.store(Arc::new(merged));
392 debug!(handler_count = handlers.len(), skipped, "Rebuilt inner router.");
393}
394
395fn http_post_process(router: Router) -> Router {
396 router
397}
398
399fn grpc_post_process(router: Router) -> Router {
401 router.fallback(grpc_unimplemented)
402}
403
404async fn grpc_unimplemented() -> Response<AxumBody> {
405 tonic::Status::unimplemented("").into_http()
406}
407
408#[cfg(test)]
409mod tests {
410 use std::{net::SocketAddr, time::Duration};
411
412 use async_trait::async_trait;
413 use axum::Router;
414 use http_body_util::{BodyExt as _, Empty};
415 use hyper::{body::Bytes, StatusCode};
416 use hyper_util::{client::legacy::Client, rt::TokioExecutor};
417 use saluki_api::{APIHandler, DynamicRoute, EndpointType};
418 use saluki_core::runtime::{
419 state::{AssertionUpdate, DataspaceRegistry, Identifier, IdentifierFilter},
420 InitializationError, Supervisable, Supervisor, SupervisorFuture,
421 };
422 use tokio::{
423 pin, select,
424 sync::{mpsc, oneshot},
425 task::JoinHandle,
426 time::{sleep, timeout, Instant},
427 };
428
429 use super::*;
430
431 struct SimpleHandler {
432 path: &'static str,
433 body: &'static str,
434 }
435
436 impl APIHandler for SimpleHandler {
437 type State = ();
438
439 fn generate_initial_state(&self) -> Self::State {}
440
441 fn generate_routes(&self) -> Router<Self::State> {
442 let body = self.body;
443 Router::new().route(self.path, axum::routing::get(move || async move { body }))
444 }
445 }
446
447 enum RouteCommand {
448 Assert { id: Identifier, route: DynamicRoute },
449 Retract { id: Identifier },
450 }
451
452 struct RouteAsserter {
453 commands_rx: std::sync::Mutex<Option<mpsc::Receiver<RouteCommand>>>,
454 addr_tx: std::sync::Mutex<Option<oneshot::Sender<SocketAddr>>>,
455 endpoint_type: EndpointType,
456 }
457
458 #[async_trait]
459 impl Supervisable for RouteAsserter {
460 fn name(&self) -> &str {
461 "route-asserter"
462 }
463
464 async fn initialize(&self, process_shutdown: ShutdownHandle) -> Result<SupervisorFuture, InitializationError> {
465 let mut commands_rx =
466 self.commands_rx
467 .lock()
468 .unwrap()
469 .take()
470 .ok_or_else(|| InitializationError::Failed {
471 source: generic_error!("RouteAsserter can only be initialized once"),
472 })?;
473 let addr_tx = self.addr_tx.lock().unwrap().take();
474 let endpoint_type = self.endpoint_type;
475
476 Ok(Box::pin(async move {
477 let dataspace =
478 DataspaceRegistry::try_current().ok_or_else(|| generic_error!("Dataspace not available."))?;
479
480 let bound_addr_name = match endpoint_type {
482 EndpointType::Unprivileged => "dynamic-unprivileged-api",
483 EndpointType::Privileged => "dynamic-privileged-api",
484 };
485 let mut addr_sub =
486 dataspace.subscribe::<BoundApiAddress>(IdentifierFilter::exact(Identifier::named(bound_addr_name)));
487
488 let addr = match addr_sub.recv().await {
489 Some(AssertionUpdate::Asserted(_, BoundApiAddress(mut addr))) => {
490 if addr.ip().is_unspecified() {
492 addr.set_ip(std::net::Ipv4Addr::LOCALHOST.into());
493 }
494 addr
495 }
496 other => return Err(generic_error!("unexpected bound address update: {:?}", other)),
497 };
498
499 if let Some(tx) = addr_tx {
500 let _ = tx.send(addr);
501 }
502
503 pin!(process_shutdown);
505
506 loop {
507 select! {
508 _ = &mut process_shutdown => break,
509 cmd = commands_rx.recv() => {
510 let Some(cmd) = cmd else { break };
511 match cmd {
512 RouteCommand::Assert { id, route } => {
513 dataspace.assert(route, id);
514 }
515 RouteCommand::Retract { id } => {
516 dataspace.retract::<DynamicRoute>(id);
517 }
518 }
519 }
520 }
521 }
522
523 Ok(())
524 }))
525 }
526 }
527
528 struct TestHarness {
529 addr: SocketAddr,
530 commands: mpsc::Sender<RouteCommand>,
531 _shutdown: oneshot::Sender<()>,
532 _handle: JoinHandle<()>,
533 }
534
535 impl TestHarness {
536 async fn assert_route(&self, id: impl Into<Identifier>, route: DynamicRoute) {
537 self.commands
538 .send(RouteCommand::Assert { id: id.into(), route })
539 .await
540 .unwrap();
541 }
542
543 async fn retract_route(&self, id: impl Into<Identifier>) {
544 self.commands
545 .send(RouteCommand::Retract { id: id.into() })
546 .await
547 .unwrap();
548 }
549 }
550
551 async fn setup_test_harness(endpoint_type: EndpointType) -> TestHarness {
552 setup_test_harness_with(endpoint_type, |b| b).await
553 }
554
555 async fn setup_test_harness_with<F>(endpoint_type: EndpointType, configure: F) -> TestHarness
556 where
557 F: FnOnce(DynamicAPIBuilder) -> DynamicAPIBuilder,
558 {
559 let (commands_tx, commands_rx) = mpsc::channel(16);
560 let (addr_tx, addr_rx) = oneshot::channel();
561
562 let api_builder = configure(DynamicAPIBuilder::new(endpoint_type, ListenAddress::any_tcp(0)));
563 let route_asserter = RouteAsserter {
564 commands_rx: std::sync::Mutex::new(Some(commands_rx)),
565 addr_tx: std::sync::Mutex::new(Some(addr_tx)),
566 endpoint_type,
567 };
568
569 let mut sup = Supervisor::new("test-dynamic-api").unwrap();
570 sup.add_worker(api_builder);
571 sup.add_worker(route_asserter);
572
573 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
574 let handle = tokio::spawn(async move {
575 let _ = sup.run_with_shutdown(shutdown_rx).await;
576 });
577
578 let addr = timeout(Duration::from_secs(5), addr_rx)
579 .await
580 .expect("timed out waiting for bound address")
581 .expect("addr channel closed");
582
583 TestHarness {
584 addr,
585 commands: commands_tx,
586 _shutdown: shutdown_tx,
587 _handle: handle,
588 }
589 }
590
591 async fn http_get(addr: SocketAddr, path: &str) -> (StatusCode, String) {
592 let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
593 let uri = format!("http://{}{}", addr, path);
594 let resp = client.get(uri.parse().unwrap()).await.unwrap();
595 let status = resp.status();
596 let body = resp.into_body().collect().await.unwrap().to_bytes();
597 let body_str = String::from_utf8_lossy(&body).into_owned();
598 (status, body_str)
599 }
600
601 async fn grpc_post(addr: SocketAddr, path: &str) -> (StatusCode, http::HeaderMap) {
602 let client: Client<_, Empty<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
603 let uri: hyper::Uri = format!("http://{}{}", addr, path).parse().unwrap();
604 let req = hyper::Request::builder()
605 .uri(uri)
606 .method(hyper::Method::POST)
607 .header(hyper::header::CONTENT_TYPE, "application/grpc")
608 .body(Empty::<Bytes>::new())
609 .unwrap();
610 let resp = client.request(req).await.unwrap();
611 (resp.status(), resp.headers().clone())
612 }
613
614 async fn assert_status_eventually(addr: SocketAddr, path: &str, expected: StatusCode) -> String {
615 let deadline = Instant::now() + Duration::from_secs(2);
616 loop {
617 let (status, body) = http_get(addr, path).await;
618 if status == expected {
619 return body;
620 }
621 if Instant::now() > deadline {
622 panic!("expected {} for {} but got {}", expected, path, status);
623 }
624 sleep(Duration::from_millis(50)).await;
625 }
626 }
627
628 #[tokio::test]
631 async fn serves_asserted_http_route() {
632 let harness = setup_test_harness(EndpointType::Unprivileged).await;
633
634 let route = DynamicRoute::http(
635 EndpointType::Unprivileged,
636 SimpleHandler {
637 path: "/health",
638 body: "ok",
639 },
640 );
641 harness.assert_route("health", route).await;
642
643 let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
644 assert_eq!(body, "ok");
645 }
646
647 #[tokio::test]
648 async fn returns_404_for_unknown_route() {
649 let harness = setup_test_harness(EndpointType::Unprivileged).await;
650 let (status, _) = http_get(harness.addr, "/nonexistent").await;
651 assert_eq!(status, StatusCode::NOT_FOUND);
652 }
653
654 #[tokio::test]
655 async fn route_retraction_removes_route() {
656 let harness = setup_test_harness(EndpointType::Unprivileged).await;
657
658 let route = DynamicRoute::http(
659 EndpointType::Unprivileged,
660 SimpleHandler {
661 path: "/temp",
662 body: "temporary",
663 },
664 );
665 harness.assert_route("temp", route).await;
666 assert_status_eventually(harness.addr, "/temp", StatusCode::OK).await;
667
668 harness.retract_route("temp").await;
669 assert_status_eventually(harness.addr, "/temp", StatusCode::NOT_FOUND).await;
670 }
671
672 #[tokio::test]
673 async fn multiple_routes_independent_lifecycle() {
674 let harness = setup_test_harness(EndpointType::Unprivileged).await;
675
676 let route_a = DynamicRoute::http(
677 EndpointType::Unprivileged,
678 SimpleHandler {
679 path: "/a",
680 body: "alpha",
681 },
682 );
683 let route_b = DynamicRoute::http(
684 EndpointType::Unprivileged,
685 SimpleHandler {
686 path: "/b",
687 body: "bravo",
688 },
689 );
690 harness.assert_route("a", route_a).await;
691 harness.assert_route("b", route_b).await;
692
693 assert_status_eventually(harness.addr, "/a", StatusCode::OK).await;
694 assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
695
696 harness.retract_route("a").await;
698 assert_status_eventually(harness.addr, "/a", StatusCode::NOT_FOUND).await;
699
700 let body = assert_status_eventually(harness.addr, "/b", StatusCode::OK).await;
701 assert_eq!(body, "bravo");
702 }
703
704 #[tokio::test]
705 async fn ignores_routes_for_different_endpoint_type() {
706 let harness = setup_test_harness(EndpointType::Unprivileged).await;
707
708 let wrong_route = DynamicRoute::http(
710 EndpointType::Privileged,
711 SimpleHandler {
712 path: "/secret",
713 body: "secret",
714 },
715 );
716 harness.assert_route("secret", wrong_route).await;
717
718 let (status, _) = http_get(harness.addr, "/secret").await;
719 assert_eq!(status, StatusCode::NOT_FOUND);
720
721 let right_route = DynamicRoute::http(
723 EndpointType::Unprivileged,
724 SimpleHandler {
725 path: "/secret",
726 body: "not secret",
727 },
728 );
729 harness.assert_route("secret-unpriv", right_route).await;
730
731 let body = assert_status_eventually(harness.addr, "/secret", StatusCode::OK).await;
732 assert_eq!(body, "not secret");
733 }
734
735 #[tokio::test]
736 async fn overlapping_routes_do_not_crash_server() {
737 let harness = setup_test_harness(EndpointType::Unprivileged).await;
738
739 let route_1 = DynamicRoute::http(
741 EndpointType::Unprivileged,
742 SimpleHandler {
743 path: "/health",
744 body: "health-1",
745 },
746 );
747 harness.assert_route("health-1", route_1).await;
748 let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
749 assert_eq!(body, "health-1");
750
751 let route_2 = DynamicRoute::http(
754 EndpointType::Unprivileged,
755 SimpleHandler {
756 path: "/health",
757 body: "health-2",
758 },
759 );
760 harness.assert_route("health-2", route_2).await;
761
762 sleep(Duration::from_millis(200)).await;
764
765 let (status, body) = http_get(harness.addr, "/health").await;
767 assert_eq!(status, StatusCode::OK);
768 assert_eq!(body, "health-1");
769
770 let route_info = DynamicRoute::http(
772 EndpointType::Unprivileged,
773 SimpleHandler {
774 path: "/info",
775 body: "info",
776 },
777 );
778 harness.assert_route("info", route_info).await;
779 let body = assert_status_eventually(harness.addr, "/info", StatusCode::OK).await;
780 assert_eq!(body, "info");
781
782 harness.retract_route("health-1").await;
785 let body = assert_status_eventually(harness.addr, "/health", StatusCode::OK).await;
786 assert_eq!(body, "health-2");
787 }
788
789 #[tokio::test]
790 async fn overlapping_route_retraction_then_reassertion() {
791 let harness = setup_test_harness(EndpointType::Unprivileged).await;
792
793 let route_a = DynamicRoute::http(
795 EndpointType::Unprivileged,
796 SimpleHandler {
797 path: "/overlap",
798 body: "a",
799 },
800 );
801 let route_b = DynamicRoute::http(
802 EndpointType::Unprivileged,
803 SimpleHandler {
804 path: "/overlap",
805 body: "b",
806 },
807 );
808 harness.assert_route("ov-a", route_a).await;
809 harness.assert_route("ov-b", route_b).await;
810
811 let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
813 assert_eq!(body, "a");
814
815 harness.retract_route("ov-a").await;
817 harness.retract_route("ov-b").await;
818 assert_status_eventually(harness.addr, "/overlap", StatusCode::NOT_FOUND).await;
819
820 let route_c = DynamicRoute::http(
822 EndpointType::Unprivileged,
823 SimpleHandler {
824 path: "/overlap",
825 body: "c",
826 },
827 );
828 harness.assert_route("ov-c", route_c).await;
829 let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
830 assert_eq!(body, "c");
831 }
832
833 #[tokio::test]
834 async fn static_handler_served_without_dynamic_routes() {
835 let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
836 b.with_handler(SimpleHandler {
837 path: "/static",
838 body: "static",
839 })
840 })
841 .await;
842
843 let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
844 assert_eq!(body, "static");
845 }
846
847 #[tokio::test]
848 async fn static_and_dynamic_routes_coexist() {
849 let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
850 b.with_handler(SimpleHandler {
851 path: "/static",
852 body: "static",
853 })
854 })
855 .await;
856
857 let body = assert_status_eventually(harness.addr, "/static", StatusCode::OK).await;
859 assert_eq!(body, "static");
860
861 let dynamic_route = DynamicRoute::http(
863 EndpointType::Unprivileged,
864 SimpleHandler {
865 path: "/dynamic",
866 body: "dynamic",
867 },
868 );
869 harness.assert_route("dyn", dynamic_route).await;
870
871 let body = assert_status_eventually(harness.addr, "/dynamic", StatusCode::OK).await;
872 assert_eq!(body, "dynamic");
873
874 let (status, body) = http_get(harness.addr, "/static").await;
875 assert_eq!(status, StatusCode::OK);
876 assert_eq!(body, "static");
877
878 harness.retract_route("dyn").await;
880 assert_status_eventually(harness.addr, "/dynamic", StatusCode::NOT_FOUND).await;
881
882 let (status, body) = http_get(harness.addr, "/static").await;
883 assert_eq!(status, StatusCode::OK);
884 assert_eq!(body, "static");
885 }
886
887 #[tokio::test]
888 async fn unknown_grpc_method_returns_unimplemented() {
889 let harness = setup_test_harness(EndpointType::Unprivileged).await;
890 let (status, headers) = grpc_post(harness.addr, "/some.Service/Method").await;
891
892 assert_eq!(status, StatusCode::OK);
894 let grpc_status = headers.get("grpc-status").and_then(|v| v.to_str().ok());
895 assert_eq!(grpc_status, Some("12"));
896 }
897
898 #[tokio::test]
899 async fn static_route_wins_overlap_with_dynamic() {
900 let harness = setup_test_harness_with(EndpointType::Unprivileged, |b| {
901 b.with_handler(SimpleHandler {
902 path: "/overlap",
903 body: "static",
904 })
905 })
906 .await;
907
908 let body = assert_status_eventually(harness.addr, "/overlap", StatusCode::OK).await;
910 assert_eq!(body, "static");
911
912 let dynamic_route = DynamicRoute::http(
914 EndpointType::Unprivileged,
915 SimpleHandler {
916 path: "/overlap",
917 body: "dynamic",
918 },
919 );
920 harness.assert_route("dyn-overlap", dynamic_route).await;
921
922 sleep(Duration::from_millis(200)).await;
923
924 let (status, body) = http_get(harness.addr, "/overlap").await;
925 assert_eq!(status, StatusCode::OK);
926 assert_eq!(body, "static");
927 }
928}