|
| 1 | +/* |
| 2 | +Copyright 2023 The Kubernetes Authors. |
| 3 | +
|
| 4 | +Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +you may not use this file except in compliance with the License. |
| 6 | +You may obtain a copy of the License at |
| 7 | +
|
| 8 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +
|
| 10 | +Unless required by applicable law or agreed to in writing, software |
| 11 | +distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +See the License for the specific language governing permissions and |
| 14 | +limitations under the License. |
| 15 | +*/ |
| 16 | + |
| 17 | +package wait |
| 18 | + |
| 19 | +import ( |
| 20 | + "context" |
| 21 | + "math" |
| 22 | + "time" |
| 23 | + |
| 24 | + "k8s.io/apimachinery/pkg/util/runtime" |
| 25 | + "k8s.io/utils/clock" |
| 26 | +) |
| 27 | + |
| 28 | +// Backoff holds parameters applied to a Backoff function. |
| 29 | +type Backoff struct { |
| 30 | + // The initial duration. |
| 31 | + Duration time.Duration |
| 32 | + // Duration is multiplied by factor each iteration, if factor is not zero |
| 33 | + // and the limits imposed by Steps and Cap have not been reached. |
| 34 | + // Should not be negative. |
| 35 | + // The jitter does not contribute to the updates to the duration parameter. |
| 36 | + Factor float64 |
| 37 | + // The sleep at each iteration is the duration plus an additional |
| 38 | + // amount chosen uniformly at random from the interval between |
| 39 | + // zero and `jitter*duration`. |
| 40 | + Jitter float64 |
| 41 | + // The remaining number of iterations in which the duration |
| 42 | + // parameter may change (but progress can be stopped earlier by |
| 43 | + // hitting the cap). If not positive, the duration is not |
| 44 | + // changed. Used for exponential backoff in combination with |
| 45 | + // Factor and Cap. |
| 46 | + Steps int |
| 47 | + // A limit on revised values of the duration parameter. If a |
| 48 | + // multiplication by the factor parameter would make the duration |
| 49 | + // exceed the cap then the duration is set to the cap and the |
| 50 | + // steps parameter is set to zero. |
| 51 | + Cap time.Duration |
| 52 | +} |
| 53 | + |
| 54 | +// Step (1) returns an amount of time to sleep determined by the |
| 55 | +// original Duration and Jitter and (2) mutates the provided Backoff |
| 56 | +// to update its Steps and Duration. |
| 57 | +func (b *Backoff) Step() time.Duration { |
| 58 | + if b.Steps < 1 { |
| 59 | + if b.Jitter > 0 { |
| 60 | + return Jitter(b.Duration, b.Jitter) |
| 61 | + } |
| 62 | + return b.Duration |
| 63 | + } |
| 64 | + b.Steps-- |
| 65 | + |
| 66 | + duration := b.Duration |
| 67 | + |
| 68 | + // calculate the next step |
| 69 | + if b.Factor != 0 { |
| 70 | + b.Duration = time.Duration(float64(b.Duration) * b.Factor) |
| 71 | + if b.Cap > 0 && b.Duration > b.Cap { |
| 72 | + b.Duration = b.Cap |
| 73 | + b.Steps = 0 |
| 74 | + } |
| 75 | + } |
| 76 | + |
| 77 | + if b.Jitter > 0 { |
| 78 | + duration = Jitter(duration, b.Jitter) |
| 79 | + } |
| 80 | + return duration |
| 81 | +} |
| 82 | + |
| 83 | +// Until loops until stop channel is closed, running f every period. |
| 84 | +// |
| 85 | +// Until is syntactic sugar on top of JitterUntil with zero jitter factor and |
| 86 | +// with sliding = true (which means the timer for period starts after the f |
| 87 | +// completes). |
| 88 | +func Until(f func(), period time.Duration, stopCh <-chan struct{}) { |
| 89 | + JitterUntil(f, period, 0.0, true, stopCh) |
| 90 | +} |
| 91 | + |
| 92 | +// UntilWithContext loops until context is done, running f every period. |
| 93 | +// |
| 94 | +// UntilWithContext is syntactic sugar on top of JitterUntilWithContext |
| 95 | +// with zero jitter factor and with sliding = true (which means the timer |
| 96 | +// for period starts after the f completes). |
| 97 | +func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) { |
| 98 | + JitterUntilWithContext(ctx, f, period, 0.0, true) |
| 99 | +} |
| 100 | + |
| 101 | +// NonSlidingUntil loops until stop channel is closed, running f every |
| 102 | +// period. |
| 103 | +// |
| 104 | +// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter |
| 105 | +// factor, with sliding = false (meaning the timer for period starts at the same |
| 106 | +// time as the function starts). |
| 107 | +func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) { |
| 108 | + JitterUntil(f, period, 0.0, false, stopCh) |
| 109 | +} |
| 110 | + |
| 111 | +// NonSlidingUntilWithContext loops until context is done, running f every |
| 112 | +// period. |
| 113 | +// |
| 114 | +// NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext |
| 115 | +// with zero jitter factor, with sliding = false (meaning the timer for period |
| 116 | +// starts at the same time as the function starts). |
| 117 | +func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) { |
| 118 | + JitterUntilWithContext(ctx, f, period, 0.0, false) |
| 119 | +} |
| 120 | + |
| 121 | +// JitterUntil loops until stop channel is closed, running f every period. |
| 122 | +// |
| 123 | +// If jitterFactor is positive, the period is jittered before every run of f. |
| 124 | +// If jitterFactor is not positive, the period is unchanged and not jittered. |
| 125 | +// |
| 126 | +// If sliding is true, the period is computed after f runs. If it is false then |
| 127 | +// period includes the runtime for f. |
| 128 | +// |
| 129 | +// Close stopCh to stop. f may not be invoked if stop channel is already |
| 130 | +// closed. Pass NeverStop to if you don't want it stop. |
| 131 | +func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { |
| 132 | + BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh) |
| 133 | +} |
| 134 | + |
| 135 | +// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager. |
| 136 | +// |
| 137 | +// If sliding is true, the period is computed after f runs. If it is false then |
| 138 | +// period includes the runtime for f. |
| 139 | +func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) { |
| 140 | + var t clock.Timer |
| 141 | + for { |
| 142 | + select { |
| 143 | + case <-stopCh: |
| 144 | + return |
| 145 | + default: |
| 146 | + } |
| 147 | + |
| 148 | + if !sliding { |
| 149 | + t = backoff.Backoff() |
| 150 | + } |
| 151 | + |
| 152 | + func() { |
| 153 | + defer runtime.HandleCrash() |
| 154 | + f() |
| 155 | + }() |
| 156 | + |
| 157 | + if sliding { |
| 158 | + t = backoff.Backoff() |
| 159 | + } |
| 160 | + |
| 161 | + // NOTE: b/c there is no priority selection in golang |
| 162 | + // it is possible for this to race, meaning we could |
| 163 | + // trigger t.C and stopCh, and t.C select falls through. |
| 164 | + // In order to mitigate we re-check stopCh at the beginning |
| 165 | + // of every loop to prevent extra executions of f(). |
| 166 | + select { |
| 167 | + case <-stopCh: |
| 168 | + if !t.Stop() { |
| 169 | + <-t.C() |
| 170 | + } |
| 171 | + return |
| 172 | + case <-t.C(): |
| 173 | + } |
| 174 | + } |
| 175 | +} |
| 176 | + |
| 177 | +// JitterUntilWithContext loops until context is done, running f every period. |
| 178 | +// |
| 179 | +// If jitterFactor is positive, the period is jittered before every run of f. |
| 180 | +// If jitterFactor is not positive, the period is unchanged and not jittered. |
| 181 | +// |
| 182 | +// If sliding is true, the period is computed after f runs. If it is false then |
| 183 | +// period includes the runtime for f. |
| 184 | +// |
| 185 | +// Cancel context to stop. f may not be invoked if context is already expired. |
| 186 | +func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) { |
| 187 | + JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done()) |
| 188 | +} |
| 189 | + |
| 190 | +// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides |
| 191 | +// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff() |
| 192 | +// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in |
| 193 | +// undetermined behavior. |
| 194 | +// The BackoffManager is supposed to be called in a single-threaded environment. |
| 195 | +type BackoffManager interface { |
| 196 | + Backoff() clock.Timer |
| 197 | +} |
| 198 | + |
| 199 | +type exponentialBackoffManagerImpl struct { |
| 200 | + backoff *Backoff |
| 201 | + backoffTimer clock.Timer |
| 202 | + lastBackoffStart time.Time |
| 203 | + initialBackoff time.Duration |
| 204 | + backoffResetDuration time.Duration |
| 205 | + clock clock.Clock |
| 206 | +} |
| 207 | + |
| 208 | +// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and |
| 209 | +// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset. |
| 210 | +// This backoff manager is used to reduce load during upstream unhealthiness. |
| 211 | +func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager { |
| 212 | + return &exponentialBackoffManagerImpl{ |
| 213 | + backoff: &Backoff{ |
| 214 | + Duration: initBackoff, |
| 215 | + Factor: backoffFactor, |
| 216 | + Jitter: jitter, |
| 217 | + |
| 218 | + // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not |
| 219 | + // what we ideally need here, we set it to max int and assume we will never use up the steps |
| 220 | + Steps: math.MaxInt32, |
| 221 | + Cap: maxBackoff, |
| 222 | + }, |
| 223 | + backoffTimer: nil, |
| 224 | + initialBackoff: initBackoff, |
| 225 | + lastBackoffStart: c.Now(), |
| 226 | + backoffResetDuration: resetDuration, |
| 227 | + clock: c, |
| 228 | + } |
| 229 | +} |
| 230 | + |
| 231 | +func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration { |
| 232 | + if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration { |
| 233 | + b.backoff.Steps = math.MaxInt32 |
| 234 | + b.backoff.Duration = b.initialBackoff |
| 235 | + } |
| 236 | + b.lastBackoffStart = b.clock.Now() |
| 237 | + return b.backoff.Step() |
| 238 | +} |
| 239 | + |
| 240 | +// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff. |
| 241 | +// The returned timer must be drained before calling Backoff() the second time |
| 242 | +func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer { |
| 243 | + if b.backoffTimer == nil { |
| 244 | + b.backoffTimer = b.clock.NewTimer(b.getNextBackoff()) |
| 245 | + } else { |
| 246 | + b.backoffTimer.Reset(b.getNextBackoff()) |
| 247 | + } |
| 248 | + return b.backoffTimer |
| 249 | +} |
| 250 | + |
| 251 | +type jitteredBackoffManagerImpl struct { |
| 252 | + clock clock.Clock |
| 253 | + duration time.Duration |
| 254 | + jitter float64 |
| 255 | + backoffTimer clock.Timer |
| 256 | +} |
| 257 | + |
| 258 | +// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter |
| 259 | +// is negative, backoff will not be jittered. |
| 260 | +func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager { |
| 261 | + return &jitteredBackoffManagerImpl{ |
| 262 | + clock: c, |
| 263 | + duration: duration, |
| 264 | + jitter: jitter, |
| 265 | + backoffTimer: nil, |
| 266 | + } |
| 267 | +} |
| 268 | + |
| 269 | +func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration { |
| 270 | + jitteredPeriod := j.duration |
| 271 | + if j.jitter > 0.0 { |
| 272 | + jitteredPeriod = Jitter(j.duration, j.jitter) |
| 273 | + } |
| 274 | + return jitteredPeriod |
| 275 | +} |
| 276 | + |
| 277 | +// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff. |
| 278 | +// The returned timer must be drained before calling Backoff() the second time |
| 279 | +func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer { |
| 280 | + backoff := j.getNextBackoff() |
| 281 | + if j.backoffTimer == nil { |
| 282 | + j.backoffTimer = j.clock.NewTimer(backoff) |
| 283 | + } else { |
| 284 | + j.backoffTimer.Reset(backoff) |
| 285 | + } |
| 286 | + return j.backoffTimer |
| 287 | +} |
| 288 | + |
| 289 | +// ExponentialBackoff repeats a condition check with exponential backoff. |
| 290 | +// |
| 291 | +// It repeatedly checks the condition and then sleeps, using `backoff.Step()` |
| 292 | +// to determine the length of the sleep and adjust Duration and Steps. |
| 293 | +// Stops and returns as soon as: |
| 294 | +// 1. the condition check returns true or an error, |
| 295 | +// 2. `backoff.Steps` checks of the condition have been done, or |
| 296 | +// 3. a sleep truncated by the cap on duration has been completed. |
| 297 | +// In case (1) the returned error is what the condition function returned. |
| 298 | +// In all other cases, ErrWaitTimeout is returned. |
| 299 | +func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { |
| 300 | + for backoff.Steps > 0 { |
| 301 | + if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { |
| 302 | + return err |
| 303 | + } |
| 304 | + if backoff.Steps == 1 { |
| 305 | + break |
| 306 | + } |
| 307 | + time.Sleep(backoff.Step()) |
| 308 | + } |
| 309 | + return ErrWaitTimeout |
| 310 | +} |
| 311 | + |
| 312 | +// ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never |
| 313 | +// exceeds the deadline specified by the request context. |
| 314 | +func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error { |
| 315 | + for backoff.Steps > 0 { |
| 316 | + select { |
| 317 | + case <-ctx.Done(): |
| 318 | + return ctx.Err() |
| 319 | + default: |
| 320 | + } |
| 321 | + |
| 322 | + if ok, err := runConditionWithCrashProtectionWithContext(ctx, condition); err != nil || ok { |
| 323 | + return err |
| 324 | + } |
| 325 | + |
| 326 | + if backoff.Steps == 1 { |
| 327 | + break |
| 328 | + } |
| 329 | + |
| 330 | + waitBeforeRetry := backoff.Step() |
| 331 | + select { |
| 332 | + case <-ctx.Done(): |
| 333 | + return ctx.Err() |
| 334 | + case <-time.After(waitBeforeRetry): |
| 335 | + } |
| 336 | + } |
| 337 | + |
| 338 | + return ErrWaitTimeout |
| 339 | +} |
0 commit comments