Skip to main content

saluki_io/net/client/http/
client.rs

1use std::{
2    future::Future,
3    path::PathBuf,
4    pin::Pin,
5    task::{Context, Poll},
6    time::Duration,
7};
8
9use bytes::{Buf, Bytes};
10use http::{Request, Response, Uri};
11use http_body::Body;
12use http_body_util::{combinators::BoxBody, BodyExt as _};
13use hyper::body::Incoming;
14use hyper_http_proxy::Proxy;
15use hyper_util::{
16    client::legacy::{connect::capture_connection, Builder},
17    rt::{TokioExecutor, TokioTimer},
18};
19use metrics::Counter;
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use saluki_tls::ClientTLSConfigBuilder;
23use stringtheory::MetaString;
24use tower::{timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _};
25
26use super::{
27    conn::{check_connection_state, HttpsCapableConnectorBuilder},
28    EndpointTelemetryLayer,
29};
30
31/// The type-erased body type used internally by [`HttpClient`].
32///
33/// All request bodies are converted to this type before being sent over the wire, which ensures a single
34/// monomorphization of the underlying HTTP/2 and TLS stacks regardless of the caller's body type.
35pub type ClientBody = BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
36
37/// An HTTP client.
38#[derive(Clone)]
39pub struct HttpClient {
40    inner: BoxCloneService<Request<ClientBody>, Response<Incoming>, BoxError>,
41}
42
43impl HttpClient {
44    /// Creates a new builder for configuring an HTTP client.
45    pub fn builder() -> HttpClientBuilder {
46        HttpClientBuilder::default()
47    }
48
49    /// Sends a request to the server, and waits for a response.
50    ///
51    /// The request body is type-erased internally, so callers can use any body type that implements
52    /// [`Body`] with `Data` types that implement [`Buf`].
53    ///
54    /// # Errors
55    ///
56    /// If there was an error sending the request, an error will be returned.
57    pub async fn send<B>(&mut self, req: Request<B>) -> Result<Response<Incoming>, GenericError>
58    where
59        B: Body + Send + Sync + 'static,
60        B::Data: Buf + Send,
61        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
62    {
63        let mut req = req.map(into_client_body);
64        let captured_conn = capture_connection(&mut req);
65        let result = self
66            .inner
67            .ready()
68            .await
69            .map_err(GenericError::from_boxed)?
70            .call(req)
71            .await;
72
73        check_connection_state(captured_conn);
74
75        result.map_err(GenericError::from_boxed)
76    }
77}
78
79impl Service<Request<ClientBody>> for HttpClient {
80    type Response = Response<Incoming>;
81    type Error = BoxError;
82    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
83
84    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85        self.inner.poll_ready(cx)
86    }
87
88    fn call(&mut self, mut req: Request<ClientBody>) -> Self::Future {
89        let captured_conn = capture_connection(&mut req);
90        let fut = self.inner.call(req);
91
92        Box::pin(async move {
93            let result = fut.await;
94
95            check_connection_state(captured_conn);
96
97            result
98        })
99    }
100}
101
102/// Converts an arbitrary body into the type-erased [`ClientBody`].
103///
104/// This uses `Buf::copy_to_bytes` for the data conversion, which is zero-copy when the underlying
105/// data is already `Bytes`.
106pub fn into_client_body<B>(body: B) -> ClientBody
107where
108    B: Body + Send + Sync + 'static,
109    B::Data: Buf + Send,
110    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
111{
112    BoxBody::new(
113        body.map_frame(|frame| frame.map_data(|mut data| data.copy_to_bytes(data.remaining())))
114            .map_err(Into::into),
115    )
116}
117
118/// An HTTP client builder.
119///
120/// Provides an ergonomic builder API for configuring an HTTP client.
121///
122/// # Defaults
123///
124/// A number of sensible defaults are provided:
125///
126/// - support for both HTTP and HTTPS (uses platform's root certificates for server certificate validation)
127/// - support for both HTTP/1.1 and HTTP/2 (automatically negotiated via ALPN)
128/// - non-infinite timeouts for various stages of the request lifecycle (30 second connect timeout, 60 second per-request timeout)
129/// - connection pool for reusing connections (45 second idle connection timeout, and a maximum of 5 idle connections
130///   per host)
131/// - support for FIPS-compliant cryptography (if the `fips` feature is enabled in the `saluki-tls` crate) via [AWS-LC][aws-lc]
132///
133/// [aws-lc]: https://github.com/aws/aws-lc-rs
134pub struct HttpClientBuilder {
135    connector_builder: HttpsCapableConnectorBuilder,
136    hyper_builder: Builder,
137    tls_builder: ClientTLSConfigBuilder,
138    request_timeout: Option<Duration>,
139    endpoint_telemetry: Option<EndpointTelemetryLayer>,
140    proxies: Option<Vec<Proxy>>,
141}
142
143impl HttpClientBuilder {
144    /// Sets the timeout when connecting to the remote host.
145    ///
146    /// Defaults to 30 seconds.
147    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
148        self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
149        self
150    }
151
152    /// Sets the per-request timeout.
153    ///
154    /// The request timeout applies to each individual request made to the remote host, including each request made when
155    /// retrying a failed request.
156    ///
157    /// Defaults to 20 seconds.
158    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
159        self.request_timeout = Some(timeout);
160        self
161    }
162
163    /// Allow requests to run indefinitely.
164    ///
165    /// This means there will be no overall timeout for the request, but the request still may be subject to other
166    /// configuration settings, such as the connect timeout or retry policy.
167    pub fn without_request_timeout(mut self) -> Self {
168        self.request_timeout = None;
169        self
170    }
171
172    /// Sets the maximum age of a connection before it is closed.
173    ///
174    /// This is distinct from the maximum idle time: if any connection's age exceeds `limit`, it will be closed rather
175    /// than being reused and added to the idle connection pool.
176    ///
177    /// Defaults to no limit.
178    pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
179    where
180        L: Into<Option<Duration>>,
181    {
182        self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
183        self
184    }
185
186    /// Sets the maximum number of idle connections per host.
187    ///
188    /// Defaults to 5.
189    pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
190        self.hyper_builder.pool_max_idle_per_host(max);
191        self
192    }
193
194    /// Sets the idle connection timeout.
195    ///
196    /// Once a connection has been idle in the pool for longer than this duration, it will be closed and removed from
197    /// the pool.
198    ///
199    /// Defaults to 45 seconds.
200    pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
201        self.hyper_builder.pool_idle_timeout(timeout);
202        self
203    }
204
205    /// Sets the proxies to be used for outgoing requests.
206    ///
207    /// Defaults to no proxies. (i.e requests will be sent directly without using a proxy).
208    pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
209        self.proxies = Some(proxies);
210        self
211    }
212
213    /// Enables per-endpoint telemetry for HTTP transactions.
214    ///
215    /// See [`EndpointTelemetryLayer`] for more information.
216    pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
217    where
218        F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
219    {
220        let mut layer = EndpointTelemetryLayer::default().with_metrics_builder(metrics_builder);
221
222        if let Some(endpoint_name_fn) = endpoint_name_fn {
223            layer = layer.with_endpoint_name_fn(endpoint_name_fn);
224        }
225
226        self.endpoint_telemetry = Some(layer);
227        self
228    }
229
230    /// Sets a Unix domain socket path to route all connections through.
231    ///
232    /// When set, the client will connect to this Unix socket instead of performing DNS resolution
233    /// and TCP connection. The URI host is ignored — all requests are sent through the configured
234    /// socket.
235    ///
236    /// Defaults to unset (TCP connections via DNS).
237    #[cfg(unix)]
238    pub fn with_unix_socket_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
239        self.connector_builder = self.connector_builder.with_unix_socket_path(path);
240        self
241    }
242
243    /// Sets the TLS configuration.
244    ///
245    /// A TLS configuration builder is provided to allow for more advanced configuration of the TLS connection.
246    pub fn with_tls_config<F>(mut self, f: F) -> Self
247    where
248        F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
249    {
250        self.tls_builder = f(self.tls_builder);
251        self
252    }
253
254    /// Sets the underlying Hyper client configuration.
255    ///
256    /// This is provided to allow for more advanced configuration of the Hyper client itself, and should generally be
257    /// used sparingly.
258    pub fn with_hyper_config<F>(mut self, f: F) -> Self
259    where
260        F: FnOnce(&mut Builder),
261    {
262        f(&mut self.hyper_builder);
263        self
264    }
265
266    /// Sets a counter that gets incremented with the number of bytes sent over the connection.
267    ///
268    /// This tracks bytes sent at the HTTP client level, which includes headers and body but does not include underlying
269    /// transport overhead, such as TLS handshaking, and so on.
270    ///
271    /// Defaults to unset.
272    pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
273        self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
274        self
275    }
276
277    /// Builds the `HttpClient`.
278    ///
279    /// # Errors
280    ///
281    /// If there was an error building the TLS configuration for the client, an error will be returned.
282    pub fn build(self) -> Result<HttpClient, GenericError> {
283        let tls_config = self.tls_builder.build()?;
284        let connector = self.connector_builder.build(tls_config)?;
285        // TODO(fips): Look into updating `hyper-http-proxy` to use the provided connector for establishing the
286        // connection to the proxy itself, even when the proxy is at an HTTPS URL, to ensure our desired TLS stack is
287        // being used.
288        let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
289        if let Some(proxies) = &self.proxies {
290            for proxy in proxies {
291                proxy_connector.add_proxy(proxy.to_owned());
292            }
293        }
294        let client = self.hyper_builder.build(proxy_connector);
295
296        let inner = ServiceBuilder::new()
297            .option_layer(self.request_timeout.map(TimeoutLayer::new))
298            .option_layer(self.endpoint_telemetry)
299            .service(client.map_err(BoxError::from))
300            .boxed_clone();
301
302        Ok(HttpClient { inner })
303    }
304}
305
306impl Default for HttpClientBuilder {
307    fn default() -> Self {
308        let mut hyper_builder = Builder::new(TokioExecutor::new());
309        hyper_builder
310            .pool_timer(TokioTimer::new())
311            .pool_max_idle_per_host(5)
312            .pool_idle_timeout(Duration::from_secs(45));
313
314        Self {
315            connector_builder: HttpsCapableConnectorBuilder::default(),
316            hyper_builder,
317            tls_builder: ClientTLSConfigBuilder::new(),
318            request_timeout: Some(Duration::from_secs(20)),
319            endpoint_telemetry: None,
320            proxies: None,
321        }
322    }
323}