Skip to main content

saluki_io/net/client/http/
client.rs

1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use http::{Request, Response, Uri};
12use http_body::Body;
13use http_body_util::{combinators::BoxBody, BodyExt as _};
14use hyper::body::Incoming;
15use hyper_http_proxy::Proxy;
16use hyper_util::{
17    client::legacy::{connect::capture_connection, Builder},
18    rt::{TokioExecutor, TokioTimer},
19};
20use metrics::Counter;
21use saluki_error::GenericError;
22use saluki_metrics::MetricsBuilder;
23use saluki_tls::{ClientTLSConfigBuilder, TlsMinimumVersion};
24use stringtheory::MetaString;
25use tower::{timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _};
26
27use super::{
28    conn::{check_connection_state, HttpProtocol, HttpsCapableConnectorBuilder},
29    telemetry::HttpTransactionErrorTelemetry,
30    EndpointTelemetryLayer,
31};
32
33/// The type-erased body type used internally by [`HttpClient`].
34///
35/// All request bodies are converted to this type before being sent over the wire, which ensures a single
36/// monomorphization of the underlying HTTP/2 and TLS stacks regardless of the caller's body type.
37pub type ClientBody = BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
38
39/// An HTTP client.
40#[derive(Clone)]
41pub struct HttpClient {
42    inner: BoxCloneService<Request<ClientBody>, Response<Incoming>, BoxError>,
43}
44
45impl HttpClient {
46    /// Creates a new builder for configuring an HTTP client.
47    pub fn builder() -> HttpClientBuilder {
48        HttpClientBuilder::default()
49    }
50
51    /// Sends a request to the server, and waits for a response.
52    ///
53    /// The request body is type-erased internally, so callers can use any body type that implements
54    /// [`Body`] with `Data` types that implement [`Buf`].
55    ///
56    /// # Errors
57    ///
58    /// If there was an error sending the request, an error will be returned.
59    pub async fn send<B>(&mut self, req: Request<B>) -> Result<Response<Incoming>, GenericError>
60    where
61        B: Body + Send + Sync + 'static,
62        B::Data: Buf + Send,
63        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
64    {
65        let mut req = req.map(into_client_body);
66        let captured_conn = capture_connection(&mut req);
67        let result = self
68            .inner
69            .ready()
70            .await
71            .map_err(GenericError::from_boxed)?
72            .call(req)
73            .await;
74
75        check_connection_state(captured_conn);
76
77        result.map_err(GenericError::from_boxed)
78    }
79}
80
81impl Service<Request<ClientBody>> for HttpClient {
82    type Response = Response<Incoming>;
83    type Error = BoxError;
84    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
85
86    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87        self.inner.poll_ready(cx)
88    }
89
90    fn call(&mut self, mut req: Request<ClientBody>) -> Self::Future {
91        let captured_conn = capture_connection(&mut req);
92        let fut = self.inner.call(req);
93
94        Box::pin(async move {
95            let result = fut.await;
96
97            check_connection_state(captured_conn);
98
99            result
100        })
101    }
102}
103
104/// Converts an arbitrary body into the type-erased [`ClientBody`].
105///
106/// This uses `Buf::copy_to_bytes` for the data conversion, which is zero-copy when the underlying
107/// data is already `Bytes`.
108pub fn into_client_body<B>(body: B) -> ClientBody
109where
110    B: Body + Send + Sync + 'static,
111    B::Data: Buf + Send,
112    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
113{
114    BoxBody::new(
115        body.map_frame(|frame| frame.map_data(|mut data| data.copy_to_bytes(data.remaining())))
116            .map_err(Into::into),
117    )
118}
119
120/// An HTTP client builder.
121///
122/// Provides an ergonomic builder API for configuring an HTTP client.
123///
124/// # Defaults
125///
126/// A number of sensible defaults are provided:
127///
128/// - support for both HTTP and HTTPS (uses platform's root certificates for server certificate validation)
129/// - support for both HTTP/1.1 and HTTP/2 (automatically negotiated via ALPN)
130/// - non-infinite timeouts for various stages of the request lifecycle (30 second connect timeout, 60 second per-request timeout)
131/// - connection pool for reusing connections (45 second idle connection timeout, and a maximum of 5 idle connections
132///   per host)
133/// - support for FIPS-compliant cryptography (if the `fips` feature is enabled in the `saluki-tls` crate) via [AWS-LC][aws-lc]
134///
135/// [aws-lc]: https://github.com/aws/aws-lc-rs
136pub struct HttpClientBuilder {
137    connector_builder: HttpsCapableConnectorBuilder,
138    hyper_builder: Builder,
139    tls_builder: ClientTLSConfigBuilder,
140    request_timeout: Option<Duration>,
141    endpoint_telemetry: Option<EndpointTelemetryLayer>,
142    proxies: Option<Vec<Proxy>>,
143}
144
145impl HttpClientBuilder {
146    /// Sets the timeout when connecting to the remote host.
147    ///
148    /// Defaults to 30 seconds.
149    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
150        self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
151        self
152    }
153
154    /// Sets the per-request timeout.
155    ///
156    /// The request timeout applies to each individual request made to the remote host, including each request made when
157    /// retrying a failed request.
158    ///
159    /// Defaults to 20 seconds.
160    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
161        self.request_timeout = Some(timeout);
162        self
163    }
164
165    /// Allow requests to run indefinitely.
166    ///
167    /// This means there will be no overall timeout for the request, but the request still may be subject to other
168    /// configuration settings, such as the connect timeout or retry policy.
169    pub fn without_request_timeout(mut self) -> Self {
170        self.request_timeout = None;
171        self
172    }
173
174    /// Sets the HTTP protocol selection for client connections.
175    ///
176    /// Defaults to [`HttpProtocol::Auto`], which automatically negotiates HTTP/2 with HTTP/1.1 fallback.
177    pub fn with_http_protocol(mut self, protocol: HttpProtocol) -> Self {
178        self.connector_builder = self.connector_builder.with_http_protocol(protocol);
179        self
180    }
181
182    /// Sets the maximum age of a connection before it's closed.
183    ///
184    /// This is distinct from the maximum idle time: if any connection's age exceeds `limit`, it will be closed rather
185    /// than being reused and added to the idle connection pool.
186    ///
187    /// Defaults to no limit.
188    pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
189    where
190        L: Into<Option<Duration>>,
191    {
192        self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
193        self
194    }
195
196    /// Sets the maximum number of idle connections per host.
197    ///
198    /// Defaults to 5.
199    pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
200        self.hyper_builder.pool_max_idle_per_host(max);
201        self
202    }
203
204    /// Sets the idle connection timeout.
205    ///
206    /// Once a connection has been idle in the pool for longer than this duration, it will be closed and removed from
207    /// the pool.
208    ///
209    /// Defaults to 45 seconds.
210    pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
211        self.hyper_builder.pool_idle_timeout(timeout);
212        self
213    }
214
215    /// Sets the proxies to be used for outgoing requests.
216    ///
217    /// Defaults to no proxies. (i.e requests will be sent directly without using a proxy).
218    pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
219        self.proxies = Some(proxies);
220        self
221    }
222
223    /// Enables per-endpoint telemetry for HTTP transactions.
224    ///
225    /// See [`EndpointTelemetryLayer`] for more information.
226    pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
227    where
228        F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
229    {
230        let error_telemetry = HttpTransactionErrorTelemetry::from_builder(&metrics_builder);
231        self.connector_builder = self.connector_builder.with_error_telemetry(error_telemetry.clone());
232
233        let mut layer = EndpointTelemetryLayer::default()
234            .with_metrics_builder(metrics_builder)
235            .with_error_telemetry(error_telemetry);
236
237        if let Some(endpoint_name_fn) = endpoint_name_fn {
238            layer = layer.with_endpoint_name_fn(endpoint_name_fn);
239        }
240
241        self.endpoint_telemetry = Some(layer);
242        self
243    }
244
245    /// Sets a Unix domain socket path to route all connections through.
246    ///
247    /// When set, the client will connect to this Unix socket instead of performing DNS resolution
248    /// and TCP connection. The URI host is ignored—all requests are sent through the configured
249    /// socket.
250    ///
251    /// Defaults to unset (TCP connections via DNS).
252    #[cfg(unix)]
253    pub fn with_unix_socket_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
254        self.connector_builder = self.connector_builder.with_unix_socket_path(path);
255        self
256    }
257
258    /// Sets the TLS configuration.
259    ///
260    /// A TLS configuration builder is provided to allow for more advanced configuration of the TLS connection.
261    pub fn with_tls_config<F>(mut self, f: F) -> Self
262    where
263        F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
264    {
265        self.tls_builder = f(self.tls_builder);
266        self
267    }
268
269    /// Sets the minimum TLS protocol version for HTTPS connections.
270    ///
271    /// Defaults to TLS 1.2.
272    ///
273    /// This updates the same TLS builder configured by [`Self::with_tls_config`], so call order matters when both
274    /// methods change the minimum TLS version.
275    pub fn with_min_tls_version(mut self, version: TlsMinimumVersion) -> Self {
276        self.tls_builder = self.tls_builder.with_min_tls_version(version);
277        self
278    }
279
280    /// Sets the underlying Hyper client configuration.
281    ///
282    /// This is provided to allow for more advanced configuration of the Hyper client itself, and should generally be
283    /// used sparingly.
284    pub fn with_hyper_config<F>(mut self, f: F) -> Self
285    where
286        F: FnOnce(&mut Builder),
287    {
288        f(&mut self.hyper_builder);
289        self
290    }
291
292    /// Sets a counter that gets incremented with the number of bytes sent over the connection.
293    ///
294    /// This tracks bytes sent at the HTTP client level, which includes headers and body but doesn't include underlying
295    /// transport overhead, such as TLS handshaking, and so on.
296    ///
297    /// Defaults to unset.
298    pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
299        self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
300        self
301    }
302
303    /// Builds the `HttpClient`.
304    ///
305    /// # Errors
306    ///
307    /// If there was an error building the TLS configuration for the client, an error will be returned.
308    pub fn build(self) -> Result<HttpClient, GenericError> {
309        let tls_config = self.tls_builder.build()?;
310        let connector = self.connector_builder.build(tls_config)?;
311        // TODO(fips): Look into updating `hyper-http-proxy` to use the provided connector for establishing the
312        // connection to the proxy itself, even when the proxy is at an HTTPS URL, to ensure our desired TLS stack is
313        // being used.
314        let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
315        if let Some(proxies) = &self.proxies {
316            for proxy in proxies {
317                proxy_connector.add_proxy(proxy.to_owned());
318            }
319        }
320        let client = self.hyper_builder.build(proxy_connector);
321
322        let inner = ServiceBuilder::new()
323            .option_layer(self.request_timeout.map(TimeoutLayer::new))
324            .option_layer(self.endpoint_telemetry)
325            .service(client.map_err(BoxError::from))
326            .boxed_clone();
327
328        Ok(HttpClient { inner })
329    }
330}
331
332impl Default for HttpClientBuilder {
333    fn default() -> Self {
334        let mut hyper_builder = Builder::new(TokioExecutor::new());
335        hyper_builder
336            .pool_timer(TokioTimer::new())
337            .pool_max_idle_per_host(5)
338            .pool_idle_timeout(Duration::from_secs(45));
339
340        Self {
341            connector_builder: HttpsCapableConnectorBuilder::default(),
342            hyper_builder,
343            tls_builder: ClientTLSConfigBuilder::new(),
344            request_timeout: Some(Duration::from_secs(20)),
345            endpoint_telemetry: None,
346            proxies: None,
347        }
348    }
349}