saluki_io/net/server/
http.rs1use std::{
4 future::Future,
5 pin::Pin,
6 sync::Arc,
7 task::{ready, Context, Poll},
8};
9
10use http::{Request, Response};
11use http_body::Body;
12use hyper::{body::Incoming, service::Service};
13use hyper_util::{
14 rt::{TokioExecutor, TokioIo},
15 server::conn::auto::Builder,
16};
17use rustls::ServerConfig;
18use saluki_common::task::{spawn_traced_named, HandleExt as _};
19use saluki_error::GenericError;
20use tokio::{runtime::Handle, select, sync::oneshot};
21use tokio_rustls::TlsAcceptor;
22use tracing::{debug, error, info};
23
24use crate::net::listener::ConnectionOrientedListener;
25
26pub struct HttpServer<S> {
28 executor: Handle,
29 listener: ConnectionOrientedListener,
30 conn_builder: Builder<TokioExecutor>,
31 service: S,
32 tls_config: Option<ServerConfig>,
33}
34
35impl<S> HttpServer<S> {
36 pub fn from_listener(listener: ConnectionOrientedListener, service: S) -> Self {
42 Self {
43 executor: Handle::current(),
44 listener,
45 conn_builder: Builder::new(TokioExecutor::new()),
46 service,
47 tls_config: None,
48 }
49 }
50
51 pub fn with_tls_config(mut self, config: ServerConfig) -> Self {
57 self.tls_config = Some(config);
58 self
59 }
60
61 pub fn with_executor(mut self, executor: Handle) -> Self {
68 self.executor = executor;
69 self
70 }
71}
72
73impl<S, B> HttpServer<S>
74where
75 S: Service<Request<Incoming>, Response = Response<B>> + Send + Clone + 'static,
76 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
77 S::Future: Send + 'static,
78 B: Body + Send + 'static,
79 B::Data: Send,
80 B::Error: std::error::Error + Send + Sync,
81{
82 pub fn listen(self) -> (ShutdownHandle, ErrorHandle) {
87 let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
88 let (error_tx, error_rx) = oneshot::channel();
89
90 let Self {
91 executor,
92 mut listener,
93 conn_builder,
94 service,
95 tls_config,
96 ..
97 } = self;
98
99 spawn_traced_named("http-server-acceptor", async move {
100 let tls_enabled = tls_config.is_some();
101 let maybe_tls_acceptor = tls_config.map(|mut config| {
102 config.alpn_protocols.push(b"h2".to_vec());
104 config.alpn_protocols.push(b"http/1.1".to_vec());
105 TlsAcceptor::from(Arc::new(config))
106 });
107
108 info!(listen_addr = %listener.listen_address(), tls_enabled, "HTTP server started.");
109
110 loop {
111 select! {
112 result = listener.accept() => match result {
113 Ok(stream) => {
114 let service = service.clone();
115 let conn_builder = conn_builder.clone();
116 let listen_addr = listener.listen_address().clone();
117 match &maybe_tls_acceptor {
118 Some(acceptor) => {
119 let tls_stream = match acceptor.accept(stream).await {
120 Ok(stream) => stream,
121 Err(e) => {
122 error!(%listen_addr, error = %e, "Failed to complete TLS handshake.");
123 continue
124 },
125 };
126
127 executor.spawn_traced_named("http-server-tls-conn-handler", async move {
128 if let Err(e) = conn_builder.serve_connection(TokioIo::new(tls_stream), service).await {
129 error!(%listen_addr, error = %e, "Failed to serve HTTP connection.");
130 }
131 });
132 },
133 None => {
134 executor.spawn_traced_named("http-server-conn-handler", async move {
135 if let Err(e) = conn_builder.serve_connection(TokioIo::new(stream), service).await {
136 error!(%listen_addr, error = %e, "Failed to serve HTTP connection.");
137 }
138 });
139 },
140 }
141 },
142 Err(e) => {
143 let _ = error_tx.send(e.into());
144 break;
145 }
146 },
147
148 _ = &mut shutdown_rx => {
149 debug!(listen_addr = %listener.listen_address(), "Received shutdown signal.");
150 break;
151 }
152 }
153 }
154
155 info!(listen_addr = %listener.listen_address(), "HTTP server stopped.");
156 });
157
158 (ShutdownHandle(shutdown_tx), ErrorHandle(error_rx))
159 }
160}
161
162pub struct ShutdownHandle(oneshot::Sender<()>);
164
165impl ShutdownHandle {
166 pub fn shutdown(self) {
170 let _ = self.0.send(());
171 }
172}
173
174pub struct ErrorHandle(oneshot::Receiver<GenericError>);
176
177impl Future for ErrorHandle {
178 type Output = Option<GenericError>;
179
180 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181 match ready!(Pin::new(&mut self.0).poll(cx)) {
182 Ok(err) => Poll::Ready(Some(err)),
183 Err(_) => Poll::Ready(None),
184 }
185 }
186}