saluki_io/net/client/http/
client.rs1use std::{
2 future::Future,
3 path::PathBuf,
4 pin::Pin,
5 task::{Context, Poll},
6 time::Duration,
7};
8
9use bytes::{Buf, Bytes};
10use http::{Request, Response, Uri};
11use http_body::Body;
12use http_body_util::{combinators::BoxBody, BodyExt as _};
13use hyper::body::Incoming;
14use hyper_http_proxy::Proxy;
15use hyper_util::{
16 client::legacy::{connect::capture_connection, Builder},
17 rt::{TokioExecutor, TokioTimer},
18};
19use metrics::Counter;
20use saluki_error::GenericError;
21use saluki_metrics::MetricsBuilder;
22use saluki_tls::{ClientTLSConfigBuilder, TlsMinimumVersion};
23use stringtheory::MetaString;
24use tower::{timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _};
25
26use super::{
27 conn::{check_connection_state, HttpProtocol, HttpsCapableConnectorBuilder},
28 telemetry::HttpTransactionErrorTelemetry,
29 EndpointTelemetryLayer,
30};
31
32pub type ClientBody = BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
37
38#[derive(Clone)]
40pub struct HttpClient {
41 inner: BoxCloneService<Request<ClientBody>, Response<Incoming>, BoxError>,
42}
43
44impl HttpClient {
45 pub fn builder() -> HttpClientBuilder {
47 HttpClientBuilder::default()
48 }
49
50 pub async fn send<B>(&mut self, req: Request<B>) -> Result<Response<Incoming>, GenericError>
59 where
60 B: Body + Send + Sync + 'static,
61 B::Data: Buf + Send,
62 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
63 {
64 let mut req = req.map(into_client_body);
65 let captured_conn = capture_connection(&mut req);
66 let result = self
67 .inner
68 .ready()
69 .await
70 .map_err(GenericError::from_boxed)?
71 .call(req)
72 .await;
73
74 check_connection_state(captured_conn);
75
76 result.map_err(GenericError::from_boxed)
77 }
78}
79
80impl Service<Request<ClientBody>> for HttpClient {
81 type Response = Response<Incoming>;
82 type Error = BoxError;
83 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
84
85 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86 self.inner.poll_ready(cx)
87 }
88
89 fn call(&mut self, mut req: Request<ClientBody>) -> Self::Future {
90 let captured_conn = capture_connection(&mut req);
91 let fut = self.inner.call(req);
92
93 Box::pin(async move {
94 let result = fut.await;
95
96 check_connection_state(captured_conn);
97
98 result
99 })
100 }
101}
102
103pub fn into_client_body<B>(body: B) -> ClientBody
108where
109 B: Body + Send + Sync + 'static,
110 B::Data: Buf + Send,
111 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
112{
113 BoxBody::new(
114 body.map_frame(|frame| frame.map_data(|mut data| data.copy_to_bytes(data.remaining())))
115 .map_err(Into::into),
116 )
117}
118
119pub struct HttpClientBuilder {
136 connector_builder: HttpsCapableConnectorBuilder,
137 hyper_builder: Builder,
138 tls_builder: ClientTLSConfigBuilder,
139 request_timeout: Option<Duration>,
140 endpoint_telemetry: Option<EndpointTelemetryLayer>,
141 proxies: Option<Vec<Proxy>>,
142}
143
144impl HttpClientBuilder {
145 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
149 self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
150 self
151 }
152
153 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
160 self.request_timeout = Some(timeout);
161 self
162 }
163
164 pub fn without_request_timeout(mut self) -> Self {
169 self.request_timeout = None;
170 self
171 }
172
173 pub fn with_http_protocol(mut self, protocol: HttpProtocol) -> Self {
177 self.connector_builder = self.connector_builder.with_http_protocol(protocol);
178 self
179 }
180
181 pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
188 where
189 L: Into<Option<Duration>>,
190 {
191 self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
192 self
193 }
194
195 pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
199 self.hyper_builder.pool_max_idle_per_host(max);
200 self
201 }
202
203 pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
210 self.hyper_builder.pool_idle_timeout(timeout);
211 self
212 }
213
214 pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
218 self.proxies = Some(proxies);
219 self
220 }
221
222 pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
226 where
227 F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
228 {
229 let error_telemetry = HttpTransactionErrorTelemetry::from_builder(&metrics_builder);
230 self.connector_builder = self.connector_builder.with_error_telemetry(error_telemetry.clone());
231
232 let mut layer = EndpointTelemetryLayer::default()
233 .with_metrics_builder(metrics_builder)
234 .with_error_telemetry(error_telemetry);
235
236 if let Some(endpoint_name_fn) = endpoint_name_fn {
237 layer = layer.with_endpoint_name_fn(endpoint_name_fn);
238 }
239
240 self.endpoint_telemetry = Some(layer);
241 self
242 }
243
244 #[cfg(unix)]
252 pub fn with_unix_socket_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
253 self.connector_builder = self.connector_builder.with_unix_socket_path(path);
254 self
255 }
256
257 pub fn with_tls_config<F>(mut self, f: F) -> Self
261 where
262 F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
263 {
264 self.tls_builder = f(self.tls_builder);
265 self
266 }
267
268 pub fn with_min_tls_version(mut self, version: TlsMinimumVersion) -> Self {
275 self.tls_builder = self.tls_builder.with_min_tls_version(version);
276 self
277 }
278
279 pub fn with_hyper_config<F>(mut self, f: F) -> Self
284 where
285 F: FnOnce(&mut Builder),
286 {
287 f(&mut self.hyper_builder);
288 self
289 }
290
291 pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
298 self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
299 self
300 }
301
302 pub fn build(self) -> Result<HttpClient, GenericError> {
308 let tls_config = self.tls_builder.build()?;
309 let connector = self.connector_builder.build(tls_config)?;
310 let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
314 if let Some(proxies) = &self.proxies {
315 for proxy in proxies {
316 proxy_connector.add_proxy(proxy.to_owned());
317 }
318 }
319 let client = self.hyper_builder.build(proxy_connector);
320
321 let inner = ServiceBuilder::new()
322 .option_layer(self.request_timeout.map(TimeoutLayer::new))
323 .option_layer(self.endpoint_telemetry)
324 .service(client.map_err(BoxError::from))
325 .boxed_clone();
326
327 Ok(HttpClient { inner })
328 }
329}
330
331impl Default for HttpClientBuilder {
332 fn default() -> Self {
333 let mut hyper_builder = Builder::new(TokioExecutor::new());
334 hyper_builder
335 .pool_timer(TokioTimer::new())
336 .pool_max_idle_per_host(5)
337 .pool_idle_timeout(Duration::from_secs(45));
338
339 Self {
340 connector_builder: HttpsCapableConnectorBuilder::default(),
341 hyper_builder,
342 tls_builder: ClientTLSConfigBuilder::new(),
343 request_timeout: Some(Duration::from_secs(20)),
344 endpoint_telemetry: None,
345 proxies: None,
346 }
347 }
348}