Skip to main content

saluki_io/net/client/http/
client.rs

1use std::{
2    future::Future,
3    path::PathBuf,
4    pin::Pin,
5    task::{Context, Poll},
6    time::Duration,
7};
8
9use bytes::{Buf, Bytes};
10use http::{Request, Response, Uri};
11use http_body::Body;
12use http_body_util::{combinators::BoxBody, BodyExt as _};
13use hyper::body::Incoming;
14use hyper_http_proxy::Proxy;
15use hyper_util::{
16    client::legacy::{connect::capture_connection, Builder},
17    rt::{TokioExecutor, TokioTimer},
18};
19use metrics::Counter;
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use saluki_tls::{ClientTLSConfigBuilder, TlsMinimumVersion};
23use stringtheory::MetaString;
24use tower::{timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _};
25
26use super::{
27    conn::{check_connection_state, HttpProtocol, HttpsCapableConnectorBuilder},
28    telemetry::HttpTransactionErrorTelemetry,
29    EndpointTelemetryLayer,
30};
31
32/// The type-erased body type used internally by [`HttpClient`].
33///
34/// All request bodies are converted to this type before being sent over the wire, which ensures a single
35/// monomorphization of the underlying HTTP/2 and TLS stacks regardless of the caller's body type.
36pub type ClientBody = BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
37
38/// An HTTP client.
39#[derive(Clone)]
40pub struct HttpClient {
41    inner: BoxCloneService<Request<ClientBody>, Response<Incoming>, BoxError>,
42}
43
44impl HttpClient {
45    /// Creates a new builder for configuring an HTTP client.
46    pub fn builder() -> HttpClientBuilder {
47        HttpClientBuilder::default()
48    }
49
50    /// Sends a request to the server, and waits for a response.
51    ///
52    /// The request body is type-erased internally, so callers can use any body type that implements
53    /// [`Body`] with `Data` types that implement [`Buf`].
54    ///
55    /// # Errors
56    ///
57    /// If there was an error sending the request, an error will be returned.
58    pub async fn send<B>(&mut self, req: Request<B>) -> Result<Response<Incoming>, GenericError>
59    where
60        B: Body + Send + Sync + 'static,
61        B::Data: Buf + Send,
62        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
63    {
64        let mut req = req.map(into_client_body);
65        let captured_conn = capture_connection(&mut req);
66        let result = self
67            .inner
68            .ready()
69            .await
70            .map_err(GenericError::from_boxed)?
71            .call(req)
72            .await;
73
74        check_connection_state(captured_conn);
75
76        result.map_err(GenericError::from_boxed)
77    }
78}
79
80impl Service<Request<ClientBody>> for HttpClient {
81    type Response = Response<Incoming>;
82    type Error = BoxError;
83    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
84
85    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        self.inner.poll_ready(cx)
87    }
88
89    fn call(&mut self, mut req: Request<ClientBody>) -> Self::Future {
90        let captured_conn = capture_connection(&mut req);
91        let fut = self.inner.call(req);
92
93        Box::pin(async move {
94            let result = fut.await;
95
96            check_connection_state(captured_conn);
97
98            result
99        })
100    }
101}
102
103/// Converts an arbitrary body into the type-erased [`ClientBody`].
104///
105/// This uses `Buf::copy_to_bytes` for the data conversion, which is zero-copy when the underlying
106/// data is already `Bytes`.
107pub fn into_client_body<B>(body: B) -> ClientBody
108where
109    B: Body + Send + Sync + 'static,
110    B::Data: Buf + Send,
111    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
112{
113    BoxBody::new(
114        body.map_frame(|frame| frame.map_data(|mut data| data.copy_to_bytes(data.remaining())))
115            .map_err(Into::into),
116    )
117}
118
119/// An HTTP client builder.
120///
121/// Provides an ergonomic builder API for configuring an HTTP client.
122///
123/// # Defaults
124///
125/// A number of sensible defaults are provided:
126///
127/// - support for both HTTP and HTTPS (uses platform's root certificates for server certificate validation)
128/// - support for both HTTP/1.1 and HTTP/2 (automatically negotiated via ALPN)
129/// - non-infinite timeouts for various stages of the request lifecycle (30 second connect timeout, 60 second per-request timeout)
130/// - connection pool for reusing connections (45 second idle connection timeout, and a maximum of 5 idle connections
131///   per host)
132/// - support for FIPS-compliant cryptography (if the `fips` feature is enabled in the `saluki-tls` crate) via [AWS-LC][aws-lc]
133///
134/// [aws-lc]: https://github.com/aws/aws-lc-rs
135pub struct HttpClientBuilder {
136    connector_builder: HttpsCapableConnectorBuilder,
137    hyper_builder: Builder,
138    tls_builder: ClientTLSConfigBuilder,
139    request_timeout: Option<Duration>,
140    endpoint_telemetry: Option<EndpointTelemetryLayer>,
141    proxies: Option<Vec<Proxy>>,
142}
143
144impl HttpClientBuilder {
145    /// Sets the timeout when connecting to the remote host.
146    ///
147    /// Defaults to 30 seconds.
148    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
149        self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
150        self
151    }
152
153    /// Sets the per-request timeout.
154    ///
155    /// The request timeout applies to each individual request made to the remote host, including each request made when
156    /// retrying a failed request.
157    ///
158    /// Defaults to 20 seconds.
159    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
160        self.request_timeout = Some(timeout);
161        self
162    }
163
164    /// Allow requests to run indefinitely.
165    ///
166    /// This means there will be no overall timeout for the request, but the request still may be subject to other
167    /// configuration settings, such as the connect timeout or retry policy.
168    pub fn without_request_timeout(mut self) -> Self {
169        self.request_timeout = None;
170        self
171    }
172
173    /// Sets the HTTP protocol selection for client connections.
174    ///
175    /// Defaults to [`HttpProtocol::Auto`], which automatically negotiates HTTP/2 with HTTP/1.1 fallback.
176    pub fn with_http_protocol(mut self, protocol: HttpProtocol) -> Self {
177        self.connector_builder = self.connector_builder.with_http_protocol(protocol);
178        self
179    }
180
181    /// Sets the maximum age of a connection before it's closed.
182    ///
183    /// This is distinct from the maximum idle time: if any connection's age exceeds `limit`, it will be closed rather
184    /// than being reused and added to the idle connection pool.
185    ///
186    /// Defaults to no limit.
187    pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
188    where
189        L: Into<Option<Duration>>,
190    {
191        self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
192        self
193    }
194
195    /// Sets the maximum number of idle connections per host.
196    ///
197    /// Defaults to 5.
198    pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
199        self.hyper_builder.pool_max_idle_per_host(max);
200        self
201    }
202
203    /// Sets the idle connection timeout.
204    ///
205    /// Once a connection has been idle in the pool for longer than this duration, it will be closed and removed from
206    /// the pool.
207    ///
208    /// Defaults to 45 seconds.
209    pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
210        self.hyper_builder.pool_idle_timeout(timeout);
211        self
212    }
213
214    /// Sets the proxies to be used for outgoing requests.
215    ///
216    /// Defaults to no proxies. (i.e requests will be sent directly without using a proxy).
217    pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
218        self.proxies = Some(proxies);
219        self
220    }
221
222    /// Enables per-endpoint telemetry for HTTP transactions.
223    ///
224    /// See [`EndpointTelemetryLayer`] for more information.
225    pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
226    where
227        F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
228    {
229        let error_telemetry = HttpTransactionErrorTelemetry::from_builder(&metrics_builder);
230        self.connector_builder = self.connector_builder.with_error_telemetry(error_telemetry.clone());
231
232        let mut layer = EndpointTelemetryLayer::default()
233            .with_metrics_builder(metrics_builder)
234            .with_error_telemetry(error_telemetry);
235
236        if let Some(endpoint_name_fn) = endpoint_name_fn {
237            layer = layer.with_endpoint_name_fn(endpoint_name_fn);
238        }
239
240        self.endpoint_telemetry = Some(layer);
241        self
242    }
243
244    /// Sets a Unix domain socket path to route all connections through.
245    ///
246    /// When set, the client will connect to this Unix socket instead of performing DNS resolution
247    /// and TCP connection. The URI host is ignored—all requests are sent through the configured
248    /// socket.
249    ///
250    /// Defaults to unset (TCP connections via DNS).
251    #[cfg(unix)]
252    pub fn with_unix_socket_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
253        self.connector_builder = self.connector_builder.with_unix_socket_path(path);
254        self
255    }
256
257    /// Sets the TLS configuration.
258    ///
259    /// A TLS configuration builder is provided to allow for more advanced configuration of the TLS connection.
260    pub fn with_tls_config<F>(mut self, f: F) -> Self
261    where
262        F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
263    {
264        self.tls_builder = f(self.tls_builder);
265        self
266    }
267
268    /// Sets the minimum TLS protocol version for HTTPS connections.
269    ///
270    /// Defaults to TLS 1.2.
271    ///
272    /// This updates the same TLS builder configured by [`Self::with_tls_config`], so call order matters when both
273    /// methods change the minimum TLS version.
274    pub fn with_min_tls_version(mut self, version: TlsMinimumVersion) -> Self {
275        self.tls_builder = self.tls_builder.with_min_tls_version(version);
276        self
277    }
278
279    /// Sets the underlying Hyper client configuration.
280    ///
281    /// This is provided to allow for more advanced configuration of the Hyper client itself, and should generally be
282    /// used sparingly.
283    pub fn with_hyper_config<F>(mut self, f: F) -> Self
284    where
285        F: FnOnce(&mut Builder),
286    {
287        f(&mut self.hyper_builder);
288        self
289    }
290
291    /// Sets a counter that gets incremented with the number of bytes sent over the connection.
292    ///
293    /// This tracks bytes sent at the HTTP client level, which includes headers and body but doesn't include underlying
294    /// transport overhead, such as TLS handshaking, and so on.
295    ///
296    /// Defaults to unset.
297    pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
298        self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
299        self
300    }
301
302    /// Builds the `HttpClient`.
303    ///
304    /// # Errors
305    ///
306    /// If there was an error building the TLS configuration for the client, an error will be returned.
307    pub fn build(self) -> Result<HttpClient, GenericError> {
308        let tls_config = self.tls_builder.build()?;
309        let connector = self.connector_builder.build(tls_config)?;
310        // TODO(fips): Look into updating `hyper-http-proxy` to use the provided connector for establishing the
311        // connection to the proxy itself, even when the proxy is at an HTTPS URL, to ensure our desired TLS stack is
312        // being used.
313        let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
314        if let Some(proxies) = &self.proxies {
315            for proxy in proxies {
316                proxy_connector.add_proxy(proxy.to_owned());
317            }
318        }
319        let client = self.hyper_builder.build(proxy_connector);
320
321        let inner = ServiceBuilder::new()
322            .option_layer(self.request_timeout.map(TimeoutLayer::new))
323            .option_layer(self.endpoint_telemetry)
324            .service(client.map_err(BoxError::from))
325            .boxed_clone();
326
327        Ok(HttpClient { inner })
328    }
329}
330
331impl Default for HttpClientBuilder {
332    fn default() -> Self {
333        let mut hyper_builder = Builder::new(TokioExecutor::new());
334        hyper_builder
335            .pool_timer(TokioTimer::new())
336            .pool_max_idle_per_host(5)
337            .pool_idle_timeout(Duration::from_secs(45));
338
339        Self {
340            connector_builder: HttpsCapableConnectorBuilder::default(),
341            hyper_builder,
342            tls_builder: ClientTLSConfigBuilder::new(),
343            request_timeout: Some(Duration::from_secs(20)),
344            endpoint_telemetry: None,
345            proxies: None,
346        }
347    }
348}