saluki_io/net/server/
http.rs

1//! Basic HTTP server.
2
3use std::{
4    future::Future,
5    pin::Pin,
6    sync::Arc,
7    task::{ready, Context, Poll},
8};
9
10use http::{Request, Response};
11use http_body::Body;
12use hyper::{body::Incoming, service::Service};
13use hyper_util::{
14    rt::{TokioExecutor, TokioIo},
15    server::conn::auto::Builder,
16};
17use rustls::ServerConfig;
18use saluki_common::task::{spawn_traced_named, HandleExt as _};
19use saluki_error::GenericError;
20use tokio::{runtime::Handle, select, sync::oneshot};
21use tokio_rustls::TlsAcceptor;
22use tracing::{debug, error, info};
23
24use crate::net::listener::ConnectionOrientedListener;
25
26/// An HTTP server.
27pub struct HttpServer<S> {
28    executor: Handle,
29    listener: ConnectionOrientedListener,
30    conn_builder: Builder<TokioExecutor>,
31    service: S,
32    tls_config: Option<ServerConfig>,
33}
34
35impl<S> HttpServer<S> {
36    /// Creates a new `HttpServer` from the given listener and service.
37    ///
38    /// # Panics
39    ///
40    /// This will panic if called outside the context of a Tokio runtime.
41    pub fn from_listener(listener: ConnectionOrientedListener, service: S) -> Self {
42        Self {
43            executor: Handle::current(),
44            listener,
45            conn_builder: Builder::new(TokioExecutor::new()),
46            service,
47            tls_config: None,
48        }
49    }
50
51    /// Sets the TLS configuration for the server.
52    ///
53    /// This will enable TLS for the server, and the server will only accept connections that are encrypted with TLS.
54    ///
55    /// Defaults to TLS being disabled.
56    pub fn with_tls_config(mut self, config: ServerConfig) -> Self {
57        self.tls_config = Some(config);
58        self
59    }
60
61    /// Sets the executor for the server.
62    ///
63    /// This executor will be used for spawning tasks to handle incoming connections, but _not_ for the spawn that accepts
64    /// new connections.
65    ///
66    /// Defaults to the current Tokio runtime at the time `HttpServer::new` is called.
67    pub fn with_executor(mut self, executor: Handle) -> Self {
68        self.executor = executor;
69        self
70    }
71}
72
73impl<S, B> HttpServer<S>
74where
75    S: Service<Request<Incoming>, Response = Response<B>> + Send + Clone + 'static,
76    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
77    S::Future: Send + 'static,
78    B: Body + Send + 'static,
79    B::Data: Send,
80    B::Error: std::error::Error + Send + Sync,
81{
82    /// Starts the server and listens for incoming connections.
83    ///
84    /// Returns two handles: one for shutting down the server, and one for receiving any errors that occur while the
85    /// server is running.
86    pub fn listen(self) -> (ShutdownHandle, ErrorHandle) {
87        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
88        let (error_tx, error_rx) = oneshot::channel();
89
90        let Self {
91            executor,
92            mut listener,
93            conn_builder,
94            service,
95            tls_config,
96            ..
97        } = self;
98
99        spawn_traced_named("http-server-acceptor", async move {
100            let tls_enabled = tls_config.is_some();
101            let maybe_tls_acceptor = tls_config.map(|mut config| {
102                // Allow for HTTP/1.1 and HTTP/2.
103                config.alpn_protocols.push(b"h2".to_vec());
104                config.alpn_protocols.push(b"http/1.1".to_vec());
105                TlsAcceptor::from(Arc::new(config))
106            });
107
108            info!(listen_addr = %listener.listen_address(), tls_enabled, "HTTP server started.");
109
110            loop {
111                select! {
112                    result = listener.accept() => match result {
113                        Ok(stream) => {
114                            let service = service.clone();
115                            let conn_builder = conn_builder.clone();
116                            let listen_addr = listener.listen_address().clone();
117                            match &maybe_tls_acceptor {
118                                Some(acceptor) => {
119                                    let tls_stream = match acceptor.accept(stream).await {
120                                        Ok(stream) => stream,
121                                        Err(e) => {
122                                            error!(%listen_addr, error = %e, "Failed to complete TLS handshake.");
123                                            continue
124                                        },
125                                    };
126
127                                    executor.spawn_traced_named("http-server-tls-conn-handler", async move {
128                                        if let Err(e) = conn_builder.serve_connection(TokioIo::new(tls_stream), service).await {
129                                            error!(%listen_addr, error = %e, "Failed to serve HTTP connection.");
130                                        }
131                                    });
132                                },
133                                None => {
134                                    executor.spawn_traced_named("http-server-conn-handler", async move {
135                                        if let Err(e) = conn_builder.serve_connection(TokioIo::new(stream), service).await {
136                                            error!(%listen_addr, error = %e, "Failed to serve HTTP connection.");
137                                        }
138                                    });
139                                },
140                            }
141                        },
142                        Err(e) => {
143                            let _ = error_tx.send(e.into());
144                            break;
145                        }
146                    },
147
148                    _ = &mut shutdown_rx => {
149                        debug!(listen_addr = %listener.listen_address(), "Received shutdown signal.");
150                        break;
151                    }
152                }
153            }
154
155            info!(listen_addr = %listener.listen_address(), "HTTP server stopped.");
156        });
157
158        (ShutdownHandle(shutdown_tx), ErrorHandle(error_rx))
159    }
160}
161
162/// A handle for shutting down an [`HttpServer`].
163pub struct ShutdownHandle(oneshot::Sender<()>);
164
165impl ShutdownHandle {
166    /// Triggers the server to shutdown.
167    ///
168    /// This method does not wait for shutdown to occur.
169    pub fn shutdown(self) {
170        let _ = self.0.send(());
171    }
172}
173
174/// A future that resolves when [`HttpServer`] encounters an unrecoverable error.
175pub struct ErrorHandle(oneshot::Receiver<GenericError>);
176
177impl Future for ErrorHandle {
178    type Output = Option<GenericError>;
179
180    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
181        match ready!(Pin::new(&mut self.0).poll(cx)) {
182            Ok(err) => Poll::Ready(Some(err)),
183            Err(_) => Poll::Ready(None),
184        }
185    }
186}