saluki_io/net/client/http/
client.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5    time::Duration,
6};
7
8use http::{Request, Response, Uri};
9use hyper::body::{Body, Incoming};
10use hyper_http_proxy::Proxy;
11use hyper_util::{
12    client::legacy::{connect::capture_connection, Builder},
13    rt::{TokioExecutor, TokioTimer},
14};
15use metrics::Counter;
16use saluki_error::GenericError;
17use saluki_metrics::MetricsBuilder;
18use saluki_tls::ClientTLSConfigBuilder;
19use stringtheory::MetaString;
20use tower::{
21    retry::Policy, timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _,
22};
23
24use super::{
25    conn::{check_connection_state, HttpsCapableConnectorBuilder},
26    EndpointTelemetryLayer,
27};
28use crate::net::util::retry::NoopRetryPolicy;
29
30/// An HTTP client.
31#[derive(Clone)]
32pub struct HttpClient<B = ()> {
33    inner: BoxCloneService<Request<B>, Response<Incoming>, BoxError>,
34}
35
36impl HttpClient<()> {
37    /// Creates a new builder for configuring an HTTP client.
38    pub fn builder() -> HttpClientBuilder {
39        HttpClientBuilder::default()
40    }
41}
42
43impl<B> HttpClient<B>
44where
45    B: Body + Clone + Send + Unpin + 'static,
46    B::Data: Send,
47    B::Error: Into<BoxError>,
48{
49    /// Sends a request to the server, and waits for a response.
50    ///
51    /// # Errors
52    ///
53    /// If there was an error sending the request, an error will be returned.
54    pub async fn send(&mut self, mut req: Request<B>) -> Result<Response<Incoming>, BoxError> {
55        let captured_conn = capture_connection(&mut req);
56        let result = self.inner.ready().await?.call(req).await;
57
58        check_connection_state(captured_conn);
59
60        result
61    }
62}
63
64impl<B> Service<Request<B>> for HttpClient<B>
65where
66    B: Body + Send + Unpin + 'static,
67    B::Data: Send,
68    B::Error: Into<BoxError>,
69{
70    type Response = Response<Incoming>;
71    type Error = BoxError;
72    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
73
74    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75        self.inner.poll_ready(cx)
76    }
77
78    fn call(&mut self, mut req: Request<B>) -> Self::Future {
79        let captured_conn = capture_connection(&mut req);
80        let fut = self.inner.call(req);
81
82        Box::pin(async move {
83            let result = fut.await;
84
85            check_connection_state(captured_conn);
86
87            result
88        })
89    }
90}
91
92/// An HTTP client builder.
93///
94/// Provides an ergonomic builder API for configuring an HTTP client.
95///
96/// # Defaults
97///
98/// A number of sensible defaults are provided:
99///
100/// - support for both HTTP and HTTPS (uses platform's root certificates for server certificate validation)
101/// - support for both HTTP/1.1 and HTTP/2 (automatically negotiated via ALPN)
102/// - non-infinite timeouts for various stages of the request lifecycle (30 second connect timeout, 60 second per-request timeout)
103/// - connection pool for reusing connections (45 second idle connection timeout, and a maximum of 5 idle connections
104///   per host)
105/// - support for FIPS-compliant cryptography (if the `fips` feature is enabled in the `saluki-tls` crate) via [AWS-LC][aws-lc]
106///
107/// [aws-lc]: https://github.com/aws/aws-lc-rs
108pub struct HttpClientBuilder<P = NoopRetryPolicy> {
109    connector_builder: HttpsCapableConnectorBuilder,
110    hyper_builder: Builder,
111    tls_builder: ClientTLSConfigBuilder,
112    retry_policy: P,
113    request_timeout: Option<Duration>,
114    endpoint_telemetry: Option<EndpointTelemetryLayer>,
115    proxies: Option<Vec<Proxy>>,
116}
117
118impl<P> HttpClientBuilder<P> {
119    /// Sets the timeout when connecting to the remote host.
120    ///
121    /// Defaults to 30 seconds.
122    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
123        self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
124        self
125    }
126
127    /// Sets the per-request timeout.
128    ///
129    /// The request timeout applies to each individual request made to the remote host, including each request made when
130    /// retrying a failed request.
131    ///
132    /// Defaults to 20 seconds.
133    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
134        self.request_timeout = Some(timeout);
135        self
136    }
137
138    /// Allow requests to run indefinitely.
139    ///
140    /// This means there will be no overall timeout for the request, but the request still may be subject to other
141    /// configuration settings, such as the connect timeout or retry policy.
142    pub fn without_request_timeout(mut self) -> Self {
143        self.request_timeout = None;
144        self
145    }
146
147    /// Sets the maximum age of a connection before it is closed.
148    ///
149    /// This is distinct from the maximum idle time: if any connection's age exceeds `limit`, it will be closed rather
150    /// than being reused and added to the idle connection pool.
151    ///
152    /// Defaults to no limit.
153    pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
154    where
155        L: Into<Option<Duration>>,
156    {
157        self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
158        self
159    }
160
161    /// Sets the maximum number of idle connections per host.
162    ///
163    /// Defaults to 5.
164    pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
165        self.hyper_builder.pool_max_idle_per_host(max);
166        self
167    }
168
169    /// Sets the idle connection timeout.
170    ///
171    /// Once a connection has been idle in the pool for longer than this duration, it will be closed and removed from
172    /// the pool.
173    ///
174    /// Defaults to 45 seconds.
175    pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
176        self.hyper_builder.pool_idle_timeout(timeout);
177        self
178    }
179
180    /// Sets the retry policy to use when sending requests.
181    ///
182    /// When set, the client will automatically retry requests that are classified as having failed.
183    ///
184    /// Defaults to no retry policy. (i.e. requests are not retried)
185    pub fn with_retry_policy<P2>(self, retry_policy: P2) -> HttpClientBuilder<P2> {
186        HttpClientBuilder {
187            connector_builder: self.connector_builder,
188            hyper_builder: self.hyper_builder,
189            tls_builder: self.tls_builder,
190            request_timeout: self.request_timeout,
191            retry_policy,
192            endpoint_telemetry: self.endpoint_telemetry,
193            proxies: self.proxies,
194        }
195    }
196
197    /// Sets the proxies to be used for outgoing requests.
198    ///
199    /// Defaults to no proxies. (i.e requests will be sent directly without using a proxy).
200    pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
201        self.proxies = Some(proxies);
202        self
203    }
204
205    /// Enables per-endpoint telemetry for HTTP transactions.
206    ///
207    /// See [`EndpointTelemetryLayer`] for more information.
208    pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
209    where
210        F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
211    {
212        let mut layer = EndpointTelemetryLayer::default().with_metrics_builder(metrics_builder);
213
214        if let Some(endpoint_name_fn) = endpoint_name_fn {
215            layer = layer.with_endpoint_name_fn(endpoint_name_fn);
216        }
217
218        self.endpoint_telemetry = Some(layer);
219        self
220    }
221
222    /// Sets the TLS configuration.
223    ///
224    /// A TLS configuration builder is provided to allow for more advanced configuration of the TLS connection.
225    pub fn with_tls_config<F>(mut self, f: F) -> Self
226    where
227        F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
228    {
229        self.tls_builder = f(self.tls_builder);
230        self
231    }
232
233    /// Sets the underlying Hyper client configuration.
234    ///
235    /// This is provided to allow for more advanced configuration of the Hyper client itself, and should generally be
236    /// used sparingly.
237    pub fn with_hyper_config<F>(mut self, f: F) -> Self
238    where
239        F: FnOnce(&mut Builder),
240    {
241        f(&mut self.hyper_builder);
242        self
243    }
244
245    /// Sets a counter that gets incremented with the number of bytes sent over the connection.
246    ///
247    /// This tracks bytes sent at the HTTP client level, which includes headers and body but does not include underlying
248    /// transport overhead, such as TLS handshaking, and so on.
249    ///
250    /// Defaults to unset.
251    pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
252        self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
253        self
254    }
255
256    /// Builds the `HttpClient`.
257    ///
258    /// # Errors
259    ///
260    /// If there was an error building the TLS configuration for the client, an error will be returned.
261    pub fn build<B>(self) -> Result<HttpClient<B>, GenericError>
262    where
263        B: Body + Clone + Unpin + Send + 'static,
264        B::Data: Send,
265        B::Error: std::error::Error + Send + Sync,
266        P: Policy<Request<B>, Response<Incoming>, BoxError> + Send + Clone + 'static,
267        P::Future: Send,
268    {
269        let tls_config = self.tls_builder.build()?;
270        let connector = self.connector_builder.build(tls_config);
271        // TODO(fips): Look into updating `hyper-http-proxy` to use the provided connector for establishing the
272        // connection to the proxy itself, even when the proxy is at an HTTPS URL, to ensure our desired TLS stack is
273        // being used.
274        let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
275        if let Some(proxies) = &self.proxies {
276            for proxy in proxies {
277                proxy_connector.add_proxy(proxy.to_owned());
278            }
279        }
280        let client = self.hyper_builder.build(proxy_connector);
281
282        let inner = ServiceBuilder::new()
283            .retry(self.retry_policy)
284            .option_layer(self.request_timeout.map(TimeoutLayer::new))
285            .option_layer(self.endpoint_telemetry)
286            .service(client.map_err(BoxError::from))
287            .boxed_clone();
288
289        Ok(HttpClient { inner })
290    }
291}
292
293impl Default for HttpClientBuilder {
294    fn default() -> Self {
295        let mut hyper_builder = Builder::new(TokioExecutor::new());
296        hyper_builder
297            .pool_timer(TokioTimer::new())
298            .pool_max_idle_per_host(5)
299            .pool_idle_timeout(Duration::from_secs(45));
300
301        Self {
302            connector_builder: HttpsCapableConnectorBuilder::default(),
303            hyper_builder,
304            tls_builder: ClientTLSConfigBuilder::new(),
305            request_timeout: Some(Duration::from_secs(20)),
306            retry_policy: NoopRetryPolicy,
307            endpoint_telemetry: None,
308            proxies: None,
309        }
310    }
311}