saluki_io/net/util/retry/policy/
rolling_exponential.rs

1use std::sync::{
2    atomic::{
3        AtomicU32,
4        Ordering::{AcqRel, Relaxed},
5    },
6    Arc,
7};
8
9use tokio::time::{sleep, Sleep};
10use tower::retry::Policy;
11use tracing::debug;
12
13use crate::net::util::retry::{
14    classifier::RetryClassifier,
15    lifecycle::{DefaultDebugRetryLifecycle, RetryLifecycle},
16    ExponentialBackoff,
17};
18
19/// A rolling exponential backoff retry policy.
20///
21/// This policy applies an exponential backoff strategy to requests that are classified as needing to be retried, and
22/// maintains a memory of how many errors have occurred prior in order to potentially alter the backoff behavior of
23/// retried requests after a successful response has been received.
24///
25/// ## Rolling backoff behavior (recovery error decrease factor)
26///
27/// As responses are classified, the number of errors seen (any failed request constitutes an error) is tracked
28/// internally, which is then used to drive the exponential backoff behavior. When a request is finally successful,
29/// there are two options: reset the error count back to zero, or decrease it by some fixed amount.
30///
31/// If the recovery error decrease factor isn't set at all, then the error count is reset back to zero after any
32/// successful response. This means that if our next request fails, the backoff duration would start back at a low
33/// value. If the recovery error decrease factor is set, however, then the error count is only decreased by that fixed
34/// amount after a successful response, which means that if our next request fails, the calculated backoff duration
35/// would still be reasonably close to the last calculated backoff duration.
36///
37/// Essentially, setting a recovery error decrease factor allows the calculated backoff duration to increase/decrease
38/// more smoothly between failed requests that occur close together.
39///
40/// # Missing
41///
42/// - Ability to set an upper bound on retry attempts before giving up.
43#[derive(Clone)]
44pub struct RollingExponentialBackoffRetryPolicy<C, L = DefaultDebugRetryLifecycle> {
45    classifier: C,
46    retry_lifecycle: L,
47    backoff: ExponentialBackoff,
48    recovery_error_decrease_factor: Option<u32>,
49    error_count: Arc<AtomicU32>,
50}
51
52impl<C> RollingExponentialBackoffRetryPolicy<C> {
53    /// Creates a new `RollingExponentialBackoffRetryPolicy` with the given classifier and exponential backoff strategy.
54    ///
55    /// On successful responses, the error count will be reset back to zero.
56    pub fn new(classifier: C, backoff: ExponentialBackoff) -> Self {
57        Self {
58            classifier,
59            retry_lifecycle: DefaultDebugRetryLifecycle,
60            backoff,
61            recovery_error_decrease_factor: None,
62            error_count: Arc::new(AtomicU32::new(0)),
63        }
64    }
65}
66
67impl<C, L> RollingExponentialBackoffRetryPolicy<C, L> {
68    /// Sets the recovery error decrease factor for this policy.
69    ///
70    /// The given value controls how much the error count should be decreased by after a successful response. If the
71    /// value is `None`, then the error count will be reset back to zero after a successful response.
72    ///
73    /// Defaults to resetting the error count to zero after a successful response.
74    pub fn with_recovery_error_decrease_factor(mut self, factor: Option<u32>) -> Self {
75        self.recovery_error_decrease_factor = factor;
76        self
77    }
78
79    /// Sets the retry lifecycle for this policy.
80    ///
81    /// `RetryLifecycle` allows defining custom hooks that are called at various points within the retry policy, such as
82    /// when a request is classified as needing to be retried, when it succeeds, and so on. This can be used to add
83    /// customized and contextual logging to retries.
84    pub fn with_retry_lifecycle<L2>(self, retry_lifecycle: L2) -> RollingExponentialBackoffRetryPolicy<C, L2> {
85        RollingExponentialBackoffRetryPolicy {
86            classifier: self.classifier,
87            retry_lifecycle,
88            backoff: self.backoff,
89            recovery_error_decrease_factor: self.recovery_error_decrease_factor,
90            error_count: self.error_count,
91        }
92    }
93}
94
95impl<C, L, Req, Res, Error> Policy<Req, Res, Error> for RollingExponentialBackoffRetryPolicy<C, L>
96where
97    C: RetryClassifier<Res, Error>,
98    L: RetryLifecycle<Req, Res, Error>,
99    Req: Clone,
100{
101    type Future = Sleep;
102
103    fn retry(&mut self, request: &mut Req, response: &mut Result<Res, Error>) -> Option<Self::Future> {
104        if self.classifier.should_retry(response) {
105            // We got an error response, so update our error count and figure out how long to backoff.
106            let error_count = self.error_count.fetch_add(1, Relaxed) + 1;
107            let backoff_dur = self.backoff.get_backoff_duration(error_count);
108
109            self.retry_lifecycle
110                .before_retry(request, response, backoff_dur, error_count);
111
112            Some(sleep(backoff_dur))
113        } else {
114            self.retry_lifecycle.after_success(request, response);
115
116            // We got a successful response, so update our error count if necessary.
117            match self.recovery_error_decrease_factor {
118                Some(factor) => {
119                    debug!(decrease_factor = factor, "Decreasing error after successful response.");
120
121                    // We never expect this to fail since we never conditionally try to update: we _always_ want to
122                    // decrease the error count.
123                    let _ = self
124                        .error_count
125                        .fetch_update(AcqRel, Relaxed, |count| Some(count.saturating_sub(factor)));
126                }
127                None => {
128                    debug!("Resetting error count to zero after successful response.");
129
130                    self.error_count.store(0, Relaxed);
131                }
132            }
133
134            None
135        }
136    }
137
138    fn clone_request(&mut self, req: &Req) -> Option<Req> {
139        Some(req.clone())
140    }
141}