saluki_io/net/util/retry/policy/rolling_exponential.rs
1use std::sync::{
2 atomic::{
3 AtomicU32,
4 Ordering::{AcqRel, Relaxed},
5 },
6 Arc,
7};
8
9use tokio::time::{sleep, Sleep};
10use tower::retry::Policy;
11use tracing::debug;
12
13use crate::net::util::retry::{
14 classifier::RetryClassifier,
15 lifecycle::{DefaultDebugRetryLifecycle, RetryLifecycle},
16 ExponentialBackoff,
17};
18
19/// A rolling exponential backoff retry policy.
20///
21/// This policy applies an exponential backoff strategy to requests that are classified as needing to be retried, and
22/// maintains a memory of how many errors have occurred prior in order to potentially alter the backoff behavior of
23/// retried requests after a successful response has been received.
24///
25/// ## Rolling backoff behavior (recovery error decrease factor)
26///
27/// As responses are classified, the number of errors seen (any failed request constitutes an error) is tracked
28/// internally, which is then used to drive the exponential backoff behavior. When a request is finally successful,
29/// there are two options: reset the error count back to zero, or decrease it by some fixed amount.
30///
31/// If the recovery error decrease factor isn't set at all, then the error count is reset back to zero after any
32/// successful response. This means that if our next request fails, the backoff duration would start back at a low
33/// value. If the recovery error decrease factor is set, however, then the error count is only decreased by that fixed
34/// amount after a successful response, which means that if our next request fails, the calculated backoff duration
35/// would still be reasonably close to the last calculated backoff duration.
36///
37/// Essentially, setting a recovery error decrease factor allows the calculated backoff duration to increase/decrease
38/// more smoothly between failed requests that occur close together.
39///
40/// # Missing
41///
42/// - Ability to set an upper bound on retry attempts before giving up.
43#[derive(Clone)]
44pub struct RollingExponentialBackoffRetryPolicy<C, L = DefaultDebugRetryLifecycle> {
45 classifier: C,
46 retry_lifecycle: L,
47 backoff: ExponentialBackoff,
48 recovery_error_decrease_factor: Option<u32>,
49 error_count: Arc<AtomicU32>,
50}
51
52impl<C> RollingExponentialBackoffRetryPolicy<C> {
53 /// Creates a new `RollingExponentialBackoffRetryPolicy` with the given classifier and exponential backoff strategy.
54 ///
55 /// On successful responses, the error count will be reset back to zero.
56 pub fn new(classifier: C, backoff: ExponentialBackoff) -> Self {
57 Self {
58 classifier,
59 retry_lifecycle: DefaultDebugRetryLifecycle,
60 backoff,
61 recovery_error_decrease_factor: None,
62 error_count: Arc::new(AtomicU32::new(0)),
63 }
64 }
65}
66
67impl<C, L> RollingExponentialBackoffRetryPolicy<C, L> {
68 /// Sets the recovery error decrease factor for this policy.
69 ///
70 /// The given value controls how much the error count should be decreased by after a successful response. If the
71 /// value is `None`, then the error count will be reset back to zero after a successful response.
72 ///
73 /// Defaults to resetting the error count to zero after a successful response.
74 pub fn with_recovery_error_decrease_factor(mut self, factor: Option<u32>) -> Self {
75 self.recovery_error_decrease_factor = factor;
76 self
77 }
78
79 /// Sets the retry lifecycle for this policy.
80 ///
81 /// `RetryLifecycle` allows defining custom hooks that are called at various points within the retry policy, such as
82 /// when a request is classified as needing to be retried, when it succeeds, and so on. This can be used to add
83 /// customized and contextual logging to retries.
84 pub fn with_retry_lifecycle<L2>(self, retry_lifecycle: L2) -> RollingExponentialBackoffRetryPolicy<C, L2> {
85 RollingExponentialBackoffRetryPolicy {
86 classifier: self.classifier,
87 retry_lifecycle,
88 backoff: self.backoff,
89 recovery_error_decrease_factor: self.recovery_error_decrease_factor,
90 error_count: self.error_count,
91 }
92 }
93}
94
95impl<C, L, Req, Res, Error> Policy<Req, Res, Error> for RollingExponentialBackoffRetryPolicy<C, L>
96where
97 C: RetryClassifier<Res, Error>,
98 L: RetryLifecycle<Req, Res, Error>,
99 Req: Clone,
100{
101 type Future = Sleep;
102
103 fn retry(&mut self, request: &mut Req, response: &mut Result<Res, Error>) -> Option<Self::Future> {
104 if self.classifier.should_retry(response) {
105 // We got an error response, so update our error count and figure out how long to backoff.
106 let error_count = self.error_count.fetch_add(1, Relaxed) + 1;
107 let backoff_dur = self.backoff.get_backoff_duration(error_count);
108
109 self.retry_lifecycle
110 .before_retry(request, response, backoff_dur, error_count);
111
112 Some(sleep(backoff_dur))
113 } else {
114 self.retry_lifecycle.after_success(request, response);
115
116 // We got a successful response, so update our error count if necessary.
117 match self.recovery_error_decrease_factor {
118 Some(factor) => {
119 debug!(decrease_factor = factor, "Decreasing error after successful response.");
120
121 // We never expect this to fail since we never conditionally try to update: we _always_ want to
122 // decrease the error count.
123 let _ = self
124 .error_count
125 .fetch_update(AcqRel, Relaxed, |count| Some(count.saturating_sub(factor)));
126 }
127 None => {
128 debug!("Resetting error count to zero after successful response.");
129
130 self.error_count.store(0, Relaxed);
131 }
132 }
133
134 None
135 }
136 }
137
138 fn clone_request(&mut self, req: &Req) -> Option<Req> {
139 Some(req.clone())
140 }
141}