saluki_io/net/client/http/
client.rs1use std::{
2 future::Future,
3 path::PathBuf,
4 pin::Pin,
5 task::{Context, Poll},
6 time::Duration,
7};
8
9use bytes::{Buf, Bytes};
10use http::{Request, Response, Uri};
11use http_body::Body;
12use http_body_util::{combinators::BoxBody, BodyExt as _};
13use hyper::body::Incoming;
14use hyper_http_proxy::Proxy;
15use hyper_util::{
16 client::legacy::{connect::capture_connection, Builder},
17 rt::{TokioExecutor, TokioTimer},
18};
19use metrics::Counter;
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use saluki_tls::ClientTLSConfigBuilder;
23use stringtheory::MetaString;
24use tower::{timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _};
25
26use super::{
27 conn::{check_connection_state, HttpsCapableConnectorBuilder},
28 EndpointTelemetryLayer,
29};
30
31pub type ClientBody = BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
36
37#[derive(Clone)]
39pub struct HttpClient {
40 inner: BoxCloneService<Request<ClientBody>, Response<Incoming>, BoxError>,
41}
42
43impl HttpClient {
44 pub fn builder() -> HttpClientBuilder {
46 HttpClientBuilder::default()
47 }
48
49 pub async fn send<B>(&mut self, req: Request<B>) -> Result<Response<Incoming>, GenericError>
58 where
59 B: Body + Send + Sync + 'static,
60 B::Data: Buf + Send,
61 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
62 {
63 let mut req = req.map(into_client_body);
64 let captured_conn = capture_connection(&mut req);
65 let result = self
66 .inner
67 .ready()
68 .await
69 .map_err(GenericError::from_boxed)?
70 .call(req)
71 .await;
72
73 check_connection_state(captured_conn);
74
75 result.map_err(GenericError::from_boxed)
76 }
77}
78
79impl Service<Request<ClientBody>> for HttpClient {
80 type Response = Response<Incoming>;
81 type Error = BoxError;
82 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
83
84 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85 self.inner.poll_ready(cx)
86 }
87
88 fn call(&mut self, mut req: Request<ClientBody>) -> Self::Future {
89 let captured_conn = capture_connection(&mut req);
90 let fut = self.inner.call(req);
91
92 Box::pin(async move {
93 let result = fut.await;
94
95 check_connection_state(captured_conn);
96
97 result
98 })
99 }
100}
101
102pub fn into_client_body<B>(body: B) -> ClientBody
107where
108 B: Body + Send + Sync + 'static,
109 B::Data: Buf + Send,
110 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
111{
112 BoxBody::new(
113 body.map_frame(|frame| frame.map_data(|mut data| data.copy_to_bytes(data.remaining())))
114 .map_err(Into::into),
115 )
116}
117
118pub struct HttpClientBuilder {
135 connector_builder: HttpsCapableConnectorBuilder,
136 hyper_builder: Builder,
137 tls_builder: ClientTLSConfigBuilder,
138 request_timeout: Option<Duration>,
139 endpoint_telemetry: Option<EndpointTelemetryLayer>,
140 proxies: Option<Vec<Proxy>>,
141}
142
143impl HttpClientBuilder {
144 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
148 self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
149 self
150 }
151
152 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
159 self.request_timeout = Some(timeout);
160 self
161 }
162
163 pub fn without_request_timeout(mut self) -> Self {
168 self.request_timeout = None;
169 self
170 }
171
172 pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
179 where
180 L: Into<Option<Duration>>,
181 {
182 self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
183 self
184 }
185
186 pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
190 self.hyper_builder.pool_max_idle_per_host(max);
191 self
192 }
193
194 pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
201 self.hyper_builder.pool_idle_timeout(timeout);
202 self
203 }
204
205 pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
209 self.proxies = Some(proxies);
210 self
211 }
212
213 pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
217 where
218 F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
219 {
220 let mut layer = EndpointTelemetryLayer::default().with_metrics_builder(metrics_builder);
221
222 if let Some(endpoint_name_fn) = endpoint_name_fn {
223 layer = layer.with_endpoint_name_fn(endpoint_name_fn);
224 }
225
226 self.endpoint_telemetry = Some(layer);
227 self
228 }
229
230 #[cfg(unix)]
238 pub fn with_unix_socket_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
239 self.connector_builder = self.connector_builder.with_unix_socket_path(path);
240 self
241 }
242
243 pub fn with_tls_config<F>(mut self, f: F) -> Self
247 where
248 F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
249 {
250 self.tls_builder = f(self.tls_builder);
251 self
252 }
253
254 pub fn with_hyper_config<F>(mut self, f: F) -> Self
259 where
260 F: FnOnce(&mut Builder),
261 {
262 f(&mut self.hyper_builder);
263 self
264 }
265
266 pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
273 self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
274 self
275 }
276
277 pub fn build(self) -> Result<HttpClient, GenericError> {
283 let tls_config = self.tls_builder.build()?;
284 let connector = self.connector_builder.build(tls_config)?;
285 let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
289 if let Some(proxies) = &self.proxies {
290 for proxy in proxies {
291 proxy_connector.add_proxy(proxy.to_owned());
292 }
293 }
294 let client = self.hyper_builder.build(proxy_connector);
295
296 let inner = ServiceBuilder::new()
297 .option_layer(self.request_timeout.map(TimeoutLayer::new))
298 .option_layer(self.endpoint_telemetry)
299 .service(client.map_err(BoxError::from))
300 .boxed_clone();
301
302 Ok(HttpClient { inner })
303 }
304}
305
306impl Default for HttpClientBuilder {
307 fn default() -> Self {
308 let mut hyper_builder = Builder::new(TokioExecutor::new());
309 hyper_builder
310 .pool_timer(TokioTimer::new())
311 .pool_max_idle_per_host(5)
312 .pool_idle_timeout(Duration::from_secs(45));
313
314 Self {
315 connector_builder: HttpsCapableConnectorBuilder::default(),
316 hyper_builder,
317 tls_builder: ClientTLSConfigBuilder::new(),
318 request_timeout: Some(Duration::from_secs(20)),
319 endpoint_telemetry: None,
320 proxies: None,
321 }
322 }
323}