saluki_io/net/client/http/
client.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{Context, Poll},
5 time::Duration,
6};
7
8use http::{Request, Response, Uri};
9use hyper::body::{Body, Incoming};
10use hyper_http_proxy::Proxy;
11use hyper_util::{
12 client::legacy::{connect::capture_connection, Builder},
13 rt::{TokioExecutor, TokioTimer},
14};
15use metrics::Counter;
16use saluki_error::GenericError;
17use saluki_metrics::MetricsBuilder;
18use saluki_tls::ClientTLSConfigBuilder;
19use stringtheory::MetaString;
20use tower::{
21 retry::Policy, timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _,
22};
23
24use super::{
25 conn::{check_connection_state, HttpsCapableConnectorBuilder},
26 EndpointTelemetryLayer,
27};
28use crate::net::util::retry::NoopRetryPolicy;
29
30#[derive(Clone)]
32pub struct HttpClient<B = ()> {
33 inner: BoxCloneService<Request<B>, Response<Incoming>, BoxError>,
34}
35
36impl HttpClient<()> {
37 pub fn builder() -> HttpClientBuilder {
39 HttpClientBuilder::default()
40 }
41}
42
43impl<B> HttpClient<B>
44where
45 B: Body + Clone + Send + Unpin + 'static,
46 B::Data: Send,
47 B::Error: Into<BoxError>,
48{
49 pub async fn send(&mut self, mut req: Request<B>) -> Result<Response<Incoming>, BoxError> {
55 let captured_conn = capture_connection(&mut req);
56 let result = self.inner.ready().await?.call(req).await;
57
58 check_connection_state(captured_conn);
59
60 result
61 }
62}
63
64impl<B> Service<Request<B>> for HttpClient<B>
65where
66 B: Body + Send + Unpin + 'static,
67 B::Data: Send,
68 B::Error: Into<BoxError>,
69{
70 type Response = Response<Incoming>;
71 type Error = BoxError;
72 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
73
74 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75 self.inner.poll_ready(cx)
76 }
77
78 fn call(&mut self, mut req: Request<B>) -> Self::Future {
79 let captured_conn = capture_connection(&mut req);
80 let fut = self.inner.call(req);
81
82 Box::pin(async move {
83 let result = fut.await;
84
85 check_connection_state(captured_conn);
86
87 result
88 })
89 }
90}
91
92pub struct HttpClientBuilder<P = NoopRetryPolicy> {
109 connector_builder: HttpsCapableConnectorBuilder,
110 hyper_builder: Builder,
111 tls_builder: ClientTLSConfigBuilder,
112 retry_policy: P,
113 request_timeout: Option<Duration>,
114 endpoint_telemetry: Option<EndpointTelemetryLayer>,
115 proxies: Option<Vec<Proxy>>,
116}
117
118impl<P> HttpClientBuilder<P> {
119 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
123 self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
124 self
125 }
126
127 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
134 self.request_timeout = Some(timeout);
135 self
136 }
137
138 pub fn without_request_timeout(mut self) -> Self {
143 self.request_timeout = None;
144 self
145 }
146
147 pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
154 where
155 L: Into<Option<Duration>>,
156 {
157 self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
158 self
159 }
160
161 pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
165 self.hyper_builder.pool_max_idle_per_host(max);
166 self
167 }
168
169 pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
176 self.hyper_builder.pool_idle_timeout(timeout);
177 self
178 }
179
180 pub fn with_retry_policy<P2>(self, retry_policy: P2) -> HttpClientBuilder<P2> {
186 HttpClientBuilder {
187 connector_builder: self.connector_builder,
188 hyper_builder: self.hyper_builder,
189 tls_builder: self.tls_builder,
190 request_timeout: self.request_timeout,
191 retry_policy,
192 endpoint_telemetry: self.endpoint_telemetry,
193 proxies: self.proxies,
194 }
195 }
196
197 pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
201 self.proxies = Some(proxies);
202 self
203 }
204
205 pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
209 where
210 F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
211 {
212 let mut layer = EndpointTelemetryLayer::default().with_metrics_builder(metrics_builder);
213
214 if let Some(endpoint_name_fn) = endpoint_name_fn {
215 layer = layer.with_endpoint_name_fn(endpoint_name_fn);
216 }
217
218 self.endpoint_telemetry = Some(layer);
219 self
220 }
221
222 pub fn with_tls_config<F>(mut self, f: F) -> Self
226 where
227 F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
228 {
229 self.tls_builder = f(self.tls_builder);
230 self
231 }
232
233 pub fn with_hyper_config<F>(mut self, f: F) -> Self
238 where
239 F: FnOnce(&mut Builder),
240 {
241 f(&mut self.hyper_builder);
242 self
243 }
244
245 pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
252 self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
253 self
254 }
255
256 pub fn build<B>(self) -> Result<HttpClient<B>, GenericError>
262 where
263 B: Body + Clone + Unpin + Send + 'static,
264 B::Data: Send,
265 B::Error: std::error::Error + Send + Sync,
266 P: Policy<Request<B>, Response<Incoming>, BoxError> + Send + Clone + 'static,
267 P::Future: Send,
268 {
269 let tls_config = self.tls_builder.build()?;
270 let connector = self.connector_builder.build(tls_config);
271 let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
275 if let Some(proxies) = &self.proxies {
276 for proxy in proxies {
277 proxy_connector.add_proxy(proxy.to_owned());
278 }
279 }
280 let client = self.hyper_builder.build(proxy_connector);
281
282 let inner = ServiceBuilder::new()
283 .retry(self.retry_policy)
284 .option_layer(self.request_timeout.map(TimeoutLayer::new))
285 .option_layer(self.endpoint_telemetry)
286 .service(client.map_err(BoxError::from))
287 .boxed_clone();
288
289 Ok(HttpClient { inner })
290 }
291}
292
293impl Default for HttpClientBuilder {
294 fn default() -> Self {
295 let mut hyper_builder = Builder::new(TokioExecutor::new());
296 hyper_builder
297 .pool_timer(TokioTimer::new())
298 .pool_max_idle_per_host(5)
299 .pool_idle_timeout(Duration::from_secs(45));
300
301 Self {
302 connector_builder: HttpsCapableConnectorBuilder::default(),
303 hyper_builder,
304 tls_builder: ClientTLSConfigBuilder::new(),
305 request_timeout: Some(Duration::from_secs(20)),
306 retry_policy: NoopRetryPolicy,
307 endpoint_telemetry: None,
308 proxies: None,
309 }
310 }
311}