saluki_io/net/client/http/
client.rs1#[cfg(unix)]
2use std::path::PathBuf;
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use http::{Request, Response, Uri};
12use http_body::Body;
13use http_body_util::{combinators::BoxBody, BodyExt as _};
14use hyper::body::Incoming;
15use hyper_http_proxy::Proxy;
16use hyper_util::{
17 client::legacy::{connect::capture_connection, Builder},
18 rt::{TokioExecutor, TokioTimer},
19};
20use metrics::Counter;
21use saluki_error::GenericError;
22use saluki_metrics::MetricsBuilder;
23use saluki_tls::{ClientTLSConfigBuilder, TlsMinimumVersion};
24use stringtheory::MetaString;
25use tower::{timeout::TimeoutLayer, util::BoxCloneService, BoxError, Service, ServiceBuilder, ServiceExt as _};
26
27use super::{
28 conn::{check_connection_state, HttpProtocol, HttpsCapableConnectorBuilder},
29 telemetry::HttpTransactionErrorTelemetry,
30 EndpointTelemetryLayer,
31};
32
33pub type ClientBody = BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
38
39#[derive(Clone)]
41pub struct HttpClient {
42 inner: BoxCloneService<Request<ClientBody>, Response<Incoming>, BoxError>,
43}
44
45impl HttpClient {
46 pub fn builder() -> HttpClientBuilder {
48 HttpClientBuilder::default()
49 }
50
51 pub async fn send<B>(&mut self, req: Request<B>) -> Result<Response<Incoming>, GenericError>
60 where
61 B: Body + Send + Sync + 'static,
62 B::Data: Buf + Send,
63 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
64 {
65 let mut req = req.map(into_client_body);
66 let captured_conn = capture_connection(&mut req);
67 let result = self
68 .inner
69 .ready()
70 .await
71 .map_err(GenericError::from_boxed)?
72 .call(req)
73 .await;
74
75 check_connection_state(captured_conn);
76
77 result.map_err(GenericError::from_boxed)
78 }
79}
80
81impl Service<Request<ClientBody>> for HttpClient {
82 type Response = Response<Incoming>;
83 type Error = BoxError;
84 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
85
86 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
87 self.inner.poll_ready(cx)
88 }
89
90 fn call(&mut self, mut req: Request<ClientBody>) -> Self::Future {
91 let captured_conn = capture_connection(&mut req);
92 let fut = self.inner.call(req);
93
94 Box::pin(async move {
95 let result = fut.await;
96
97 check_connection_state(captured_conn);
98
99 result
100 })
101 }
102}
103
104pub fn into_client_body<B>(body: B) -> ClientBody
109where
110 B: Body + Send + Sync + 'static,
111 B::Data: Buf + Send,
112 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
113{
114 BoxBody::new(
115 body.map_frame(|frame| frame.map_data(|mut data| data.copy_to_bytes(data.remaining())))
116 .map_err(Into::into),
117 )
118}
119
120pub struct HttpClientBuilder {
137 connector_builder: HttpsCapableConnectorBuilder,
138 hyper_builder: Builder,
139 tls_builder: ClientTLSConfigBuilder,
140 request_timeout: Option<Duration>,
141 endpoint_telemetry: Option<EndpointTelemetryLayer>,
142 proxies: Option<Vec<Proxy>>,
143}
144
145impl HttpClientBuilder {
146 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
150 self.connector_builder = self.connector_builder.with_connect_timeout(timeout);
151 self
152 }
153
154 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
161 self.request_timeout = Some(timeout);
162 self
163 }
164
165 pub fn without_request_timeout(mut self) -> Self {
170 self.request_timeout = None;
171 self
172 }
173
174 pub fn with_http_protocol(mut self, protocol: HttpProtocol) -> Self {
178 self.connector_builder = self.connector_builder.with_http_protocol(protocol);
179 self
180 }
181
182 pub fn with_connection_age_limit<L>(mut self, limit: L) -> Self
189 where
190 L: Into<Option<Duration>>,
191 {
192 self.connector_builder = self.connector_builder.with_connection_age_limit(limit);
193 self
194 }
195
196 pub fn with_max_idle_conns_per_host(mut self, max: usize) -> Self {
200 self.hyper_builder.pool_max_idle_per_host(max);
201 self
202 }
203
204 pub fn with_idle_conn_timeout(mut self, timeout: Duration) -> Self {
211 self.hyper_builder.pool_idle_timeout(timeout);
212 self
213 }
214
215 pub fn with_proxies(mut self, proxies: Vec<Proxy>) -> Self {
219 self.proxies = Some(proxies);
220 self
221 }
222
223 pub fn with_endpoint_telemetry<F>(mut self, metrics_builder: MetricsBuilder, endpoint_name_fn: Option<F>) -> Self
227 where
228 F: Fn(&Uri) -> Option<MetaString> + Send + Sync + 'static,
229 {
230 let error_telemetry = HttpTransactionErrorTelemetry::from_builder(&metrics_builder);
231 self.connector_builder = self.connector_builder.with_error_telemetry(error_telemetry.clone());
232
233 let mut layer = EndpointTelemetryLayer::default()
234 .with_metrics_builder(metrics_builder)
235 .with_error_telemetry(error_telemetry);
236
237 if let Some(endpoint_name_fn) = endpoint_name_fn {
238 layer = layer.with_endpoint_name_fn(endpoint_name_fn);
239 }
240
241 self.endpoint_telemetry = Some(layer);
242 self
243 }
244
245 #[cfg(unix)]
253 pub fn with_unix_socket_path<P: Into<PathBuf>>(mut self, path: P) -> Self {
254 self.connector_builder = self.connector_builder.with_unix_socket_path(path);
255 self
256 }
257
258 pub fn with_tls_config<F>(mut self, f: F) -> Self
262 where
263 F: FnOnce(ClientTLSConfigBuilder) -> ClientTLSConfigBuilder,
264 {
265 self.tls_builder = f(self.tls_builder);
266 self
267 }
268
269 pub fn with_min_tls_version(mut self, version: TlsMinimumVersion) -> Self {
276 self.tls_builder = self.tls_builder.with_min_tls_version(version);
277 self
278 }
279
280 pub fn with_hyper_config<F>(mut self, f: F) -> Self
285 where
286 F: FnOnce(&mut Builder),
287 {
288 f(&mut self.hyper_builder);
289 self
290 }
291
292 pub fn with_bytes_sent_counter(mut self, counter: Counter) -> Self {
299 self.connector_builder = self.connector_builder.with_bytes_sent_counter(counter);
300 self
301 }
302
303 pub fn build(self) -> Result<HttpClient, GenericError> {
309 let tls_config = self.tls_builder.build()?;
310 let connector = self.connector_builder.build(tls_config)?;
311 let mut proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
315 if let Some(proxies) = &self.proxies {
316 for proxy in proxies {
317 proxy_connector.add_proxy(proxy.to_owned());
318 }
319 }
320 let client = self.hyper_builder.build(proxy_connector);
321
322 let inner = ServiceBuilder::new()
323 .option_layer(self.request_timeout.map(TimeoutLayer::new))
324 .option_layer(self.endpoint_telemetry)
325 .service(client.map_err(BoxError::from))
326 .boxed_clone();
327
328 Ok(HttpClient { inner })
329 }
330}
331
332impl Default for HttpClientBuilder {
333 fn default() -> Self {
334 let mut hyper_builder = Builder::new(TokioExecutor::new());
335 hyper_builder
336 .pool_timer(TokioTimer::new())
337 .pool_max_idle_per_host(5)
338 .pool_idle_timeout(Duration::from_secs(45));
339
340 Self {
341 connector_builder: HttpsCapableConnectorBuilder::default(),
342 hyper_builder,
343 tls_builder: ClientTLSConfigBuilder::new(),
344 request_timeout: Some(Duration::from_secs(20)),
345 endpoint_telemetry: None,
346 proxies: None,
347 }
348 }
349}