saluki_io/net/util/retry/
mod.rs1mod backoff;
2pub use self::backoff::ExponentialBackoff;
3
4mod classifier;
5pub use self::classifier::{HttpRetryPredicate, RetryClassifier, StandardHttpClassifier};
6
7mod lifecycle;
8pub use self::lifecycle::StandardHttpRetryLifecycle;
9
10mod policy;
11pub use self::policy::{NoopRetryPolicy, RollingExponentialBackoffRetryPolicy};
12
13mod queue;
14pub use self::queue::{
15 DiskUsageRetriever, DiskUsageRetrieverImpl, EventContainer, PersistedQueueArgs, PushResult, RetryQueue, Retryable,
16};
17
18pub type DefaultHttpRetryPolicy<B = ()> =
20 RollingExponentialBackoffRetryPolicy<StandardHttpClassifier<B>, StandardHttpRetryLifecycle>;
21
22impl<B: 'static> DefaultHttpRetryPolicy<B> {
23 pub fn with_backoff(backoff: ExponentialBackoff) -> Self {
27 Self::with_backoff_and_classifier(backoff, StandardHttpClassifier::new())
28 }
29
30 pub fn with_backoff_and_classifier(backoff: ExponentialBackoff, classifier: StandardHttpClassifier<B>) -> Self {
37 RollingExponentialBackoffRetryPolicy::new(classifier, backoff).with_retry_lifecycle(StandardHttpRetryLifecycle)
38 }
39}
40
41#[cfg(test)]
42mod tests {
43 use std::{sync::Arc, time::Duration};
44
45 use http::{Request, Response, StatusCode};
46 use tower::retry::Policy;
47
48 use super::*;
49
50 type BoxError = Box<dyn std::error::Error + Send + Sync>;
51 type TestRequest = Request<()>;
52
53 fn test_backoff() -> ExponentialBackoff {
54 ExponentialBackoff::with_jitter(Duration::from_millis(1), Duration::from_millis(10), 2.0)
55 }
56
57 fn test_request() -> TestRequest {
58 Request::builder()
59 .method("POST")
60 .uri("http://localhost/intake")
61 .body(())
62 .unwrap()
63 }
64
65 fn ok_response(status: StatusCode) -> Result<Response<()>, BoxError> {
66 Ok(Response::builder().status(status).body(()).unwrap())
67 }
68
69 fn would_retry(policy: &mut DefaultHttpRetryPolicy, status: StatusCode) -> bool {
70 let mut request = test_request();
71 let mut response = ok_response(status);
72 Policy::<TestRequest, Response<()>, BoxError>::retry(policy, &mut request, &mut response).is_some()
73 }
74
75 #[tokio::test]
76 async fn default_http_retry_policy_with_backoff_uses_default_classifier() {
77 let mut policy = DefaultHttpRetryPolicy::with_backoff(test_backoff());
78
79 assert!(!would_retry(&mut policy, StatusCode::OK));
80 assert!(!would_retry(&mut policy, StatusCode::FORBIDDEN));
81 assert!(!would_retry(&mut policy, StatusCode::BAD_REQUEST));
82 assert!(would_retry(&mut policy, StatusCode::INTERNAL_SERVER_ERROR));
83 assert!(would_retry(&mut policy, StatusCode::TOO_MANY_REQUESTS));
84 }
85
86 #[tokio::test]
87 async fn default_http_retry_policy_with_backoff_and_classifier_threads_predicate() {
88 let predicate: HttpRetryPredicate = Arc::new(|response| response.status() == StatusCode::FORBIDDEN);
90 let classifier = StandardHttpClassifier::new().with_predicate(predicate);
91 let mut policy = DefaultHttpRetryPolicy::with_backoff_and_classifier(test_backoff(), classifier);
92
93 assert!(would_retry(&mut policy, StatusCode::FORBIDDEN));
94 assert!(!would_retry(&mut policy, StatusCode::OK));
96 assert!(!would_retry(&mut policy, StatusCode::UNAUTHORIZED));
97 assert!(would_retry(&mut policy, StatusCode::INTERNAL_SERVER_ERROR));
98 }
99}