Skip to content

OCPBUGS-42087: storage/cacher/ready: dynamically calculate the retryAfterSeconds #2225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/features/versioned_kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
},

genericfeatures.ResilientWatchCacheInitialization: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta},
},

genericfeatures.RetryGenerateName: {
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
},

ResilientWatchCacheInitialization: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta},
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta},
},

RetryGenerateName: {
Expand Down
34 changes: 22 additions & 12 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
objType := reflect.TypeOf(obj)
cacher := &Cacher{
resourcePrefix: config.ResourcePrefix,
ready: newReady(),
ready: newReady(config.Clock),
storage: config.Storage,
objectType: objType,
groupResource: config.GroupResource,
Expand Down Expand Up @@ -537,9 +537,10 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
var readyGeneration int
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
var ok bool
readyGeneration, ok = c.ready.checkAndReadGeneration()
var downtime time.Duration
readyGeneration, downtime, ok = c.ready.checkAndReadGeneration()
if !ok {
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
return nil, errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
}
} else {
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
Expand Down Expand Up @@ -660,7 +661,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
c.Lock()
defer c.Unlock()

if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
if generation, _, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
// We went unready or are already on a different generation.
// Avoid registering and starting the watch as it will have to be
// terminated immediately anyway.
Expand Down Expand Up @@ -708,7 +709,8 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
}

if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
_, ok := c.ready.check()
if !ok {
// If Cache is not initialized, delegate Get requests to storage
// as described in https://kep.k8s.io/4568
span.AddEvent("About to Get from underlying storage - cache not initialized")
Expand All @@ -728,7 +730,8 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
// of Get requests. We can add it if it will be really needed.

if !utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if getRV == 0 && !c.ready.check() {
_, ok := c.ready.check()
if getRV == 0 && !ok {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
span.AddEvent("About to Get from underlying storage - cache not initialized and no resourceVersion set")
Expand Down Expand Up @@ -838,13 +841,15 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}

if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() && shouldDelegateListOnNotReadyCache(opts) {
_, ok := c.ready.check()
if !ok && shouldDelegateListOnNotReadyCache(opts) {
// If Cacher is not initialized, delegate List requests to storage
// as described in https://kep.k8s.io/4568
return c.storage.GetList(ctx, key, opts, listObj)
}
} else {
if listRV == 0 && !c.ready.check() {
_, ok := c.ready.check()
if listRV == 0 && !ok {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetList(ctx, key, opts, listObj)
Expand Down Expand Up @@ -873,10 +878,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
defer span.End(500 * time.Millisecond)

if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
if !c.ready.check() {
if downtime, ok := c.ready.check(); !ok {
// If Cacher is not initialized, reject List requests
// as described in https://kep.k8s.io/4568
return errors.NewTooManyRequests("storage is (re)initializing", 1)
return errors.NewTooManyRequests("storage is (re)initializing", calculateRetryAfterForUnreadyCache(downtime))
}
} else {
if err := c.ready.wait(ctx); err != nil {
Expand Down Expand Up @@ -991,7 +996,8 @@ func (c *Cacher) Count(pathPrefix string) (int64, error) {

// ReadinessCheck implements storage.Interface.
func (c *Cacher) ReadinessCheck() error {
if !c.ready.check() {
_, ok := c.ready.check()
if !ok {
return storage.ErrStorageNotReady
}
return nil
Expand Down Expand Up @@ -1473,7 +1479,11 @@ func (c *Cacher) setInitialEventsEndBookmarkIfRequested(cacheInterval *watchCach
}
}

// errWatcher implements watch.Interface to return a single error
func (c *Cacher) Ready() bool {
_, ok := c.ready.check()
return ok
}

type errWatcher struct {
result chan watch.Event
}
Expand Down
35 changes: 26 additions & 9 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"fmt"
"sync"
"time"

"k8s.io/utils/clock"
)

type status int
Expand All @@ -43,13 +46,20 @@ type ready struct {
lock sync.RWMutex // protect the state and generation variables
restartLock sync.Mutex // protect the transition from ready to pending where the channel is recreated
waitCh chan struct{} // blocks until is ready or stopped

clock clock.Clock
lastStateChangeTime time.Time
}

func newReady() *ready {
return &ready{
func newReady(c clock.Clock) *ready {
r := &ready{
waitCh: make(chan struct{}),
state: Pending,
clock: c,
}
r.updateLastStateChangeTimeLocked()

return r
}

// done close the channel once the state is Ready or Stopped
Expand Down Expand Up @@ -100,17 +110,17 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) {
}
}

// check returns true only if it is Ready.
func (r *ready) check() bool {
_, ok := r.checkAndReadGeneration()
return ok
// check returns the time elapsed since the state was last changed and the current value.
func (r *ready) check() (time.Duration, bool) {
_, elapsed, ok := r.checkAndReadGeneration()
return elapsed, ok
}

// checkAndReadGeneration returns the current generation and whether it is Ready.
func (r *ready) checkAndReadGeneration() (int, bool) {
// checkAndReadGeneration returns the current generation, the time elapsed since the state was last changed and the current value.
func (r *ready) checkAndReadGeneration() (int, time.Duration, bool) {
r.lock.RLock()
defer r.lock.RUnlock()
return r.generation, r.state == Ready
return r.generation, r.clock.Since(r.lastStateChangeTime), r.state == Ready
}

// set the state to Pending (false) or Ready (true), it does not have effect if the state is Stopped.
Expand All @@ -123,6 +133,7 @@ func (r *ready) set(ok bool) {
if ok && r.state == Pending {
r.state = Ready
r.generation++
r.updateLastStateChangeTimeLocked()
select {
case <-r.waitCh:
default:
Expand All @@ -139,6 +150,7 @@ func (r *ready) set(ok bool) {
default:
}
r.state = Pending
r.updateLastStateChangeTimeLocked()
}
}

Expand All @@ -148,10 +160,15 @@ func (r *ready) stop() {
defer r.lock.Unlock()
if r.state != Stopped {
r.state = Stopped
r.updateLastStateChangeTimeLocked()
}
select {
case <-r.waitCh:
default:
close(r.waitCh)
}
}

func (r *ready) updateLastStateChangeTimeLocked() {
r.lastStateChangeTime = r.clock.Now()
}
75 changes: 57 additions & 18 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/ready_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"sync"
"testing"
"time"

testingclock "k8s.io/utils/clock/testing"
)

func Test_newReady(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady()
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
// create 10 goroutines waiting for ready
for i := 0; i < 10; i++ {
Expand All @@ -48,20 +50,20 @@ func Test_newReady(t *testing.T) {

func Test_newReadySetIdempotent(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady()
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
ready.set(false)
ready.set(false)
if generation, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
if generation, _, ok := ready.checkAndReadGeneration(); generation != 0 || ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
ready.set(true)
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
ready.set(true)
ready.set(true)
if generation, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
if generation, _, ok := ready.checkAndReadGeneration(); generation != 1 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
ready.set(false)
Expand All @@ -77,7 +79,7 @@ func Test_newReadySetIdempotent(t *testing.T) {
t.Errorf("ready should be blocking")
}
ready.set(true)
if generation, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
if generation, _, ok := ready.checkAndReadGeneration(); generation != 2 || !ok {
t.Errorf("unexpected state: generation=%v ready=%v", generation, ok)
}
for i := 0; i < 10; i++ {
Expand All @@ -92,7 +94,7 @@ func Test_newReadySetIdempotent(t *testing.T) {
func Test_newReadyRacy(t *testing.T) {
concurrency := 1000
errCh := make(chan error, concurrency)
ready := newReady()
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -123,7 +125,7 @@ func Test_newReadyRacy(t *testing.T) {

func Test_newReadyStop(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady()
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
// create 10 goroutines waiting for ready and stop
for i := 0; i < 10; i++ {
Expand All @@ -145,24 +147,24 @@ func Test_newReadyStop(t *testing.T) {
}

func Test_newReadyCheck(t *testing.T) {
ready := newReady()
ready := newReady(testingclock.NewFakeClock(time.Now()))
// it starts as false
if ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
if _, ok := ready.check(); ok {
t.Errorf("unexpected ready state %v", ok)
}
ready.set(true)
if !ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
if _, ok := ready.check(); !ok {
t.Errorf("unexpected ready state %v", ok)
}
// stop sets ready to false
ready.stop()
if ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
if _, ok := ready.check(); ok {
t.Errorf("unexpected ready state %v", ok)
}
// can not set to true if is stopped
ready.set(true)
if ready.check() {
t.Errorf("unexpected ready state %v", ready.check())
if _, ok := ready.check(); ok {
t.Errorf("unexpected ready state %v", ok)
}
err := ready.wait(context.Background())
if err == nil {
Expand All @@ -172,7 +174,7 @@ func Test_newReadyCheck(t *testing.T) {

func Test_newReadyCancelPending(t *testing.T) {
errCh := make(chan error, 10)
ready := newReady()
ready := newReady(testingclock.NewFakeClock(time.Now()))
ready.set(false)
ctx, cancel := context.WithCancel(context.Background())
// create 10 goroutines stuck on pending
Expand All @@ -193,3 +195,40 @@ func Test_newReadyCancelPending(t *testing.T) {
}
}
}

func Test_newReadyStateChangeTimestamp(t *testing.T) {
fakeClock := testingclock.NewFakeClock(time.Now())
fakeClock.SetTime(time.Now())

ready := newReady(fakeClock)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)

ready.set(true)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, 2*time.Minute)

ready.set(false)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, 2*time.Minute)

ready.set(true)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)

ready.stop()
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, time.Minute)
fakeClock.Step(time.Minute)
checkReadyTransitionTime(t, ready, 2*time.Minute)
}

func checkReadyTransitionTime(t *testing.T, r *ready, expectedLastStateChangeDuration time.Duration) {
if lastStateChangeDuration, _ := r.check(); lastStateChangeDuration != expectedLastStateChangeDuration {
t.Errorf("unexpected last state change duration: %v, expected: %v", lastStateChangeDuration, expectedLastStateChangeDuration)
}
}
10 changes: 10 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package cacher

import (
"math"
"strings"
"time"
)

// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
Expand All @@ -44,3 +46,11 @@ func hasPathPrefix(s, pathPrefix string) bool {
}
return false
}

// calculateRetryAfterForUnreadyCache calculates the retry duration based on the cache downtime.
func calculateRetryAfterForUnreadyCache(downtime time.Duration) int {
factor := 0.06
result := math.Exp(factor * downtime.Seconds())
result = math.Min(30, math.Max(1, result))
return int(result)
}
Loading