Skip to content

Commit 324c320

Browse files
committed
DO NOT MERGE: add instrumenting to chase very short watches
1 parent 309f240 commit 324c320

File tree

2 files changed

+10
-0
lines changed

2 files changed

+10
-0
lines changed

staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go

+9
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,8 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
450450
}
451451

452452
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
453+
fmt.Printf("#### 2a groupResource=%v startCaching\n", c.groupResource)
454+
453455
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
454456
// It is safe to use the cache after a successful list until a disconnection.
455457
// We start with usable (write) locked. The below OnReplace function will
@@ -465,6 +467,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
465467
})
466468
defer func() {
467469
if successfulList {
470+
fmt.Printf("#### 2b groupResource=%v setting to false\n", c.groupResource)
468471
c.ready.set(false)
469472
}
470473
}()
@@ -477,6 +480,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
477480
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
478481
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
479482
}
483+
fmt.Printf("#### 2e groupResource=%v exiting\n", c.groupResource)
480484
}
481485

482486
// Versioner implements storage.Interface.
@@ -536,17 +540,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
536540

537541
var readyGeneration int
538542
if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) {
543+
fmt.Printf("#### 1a groupResource=%v\n", c.groupResource)
539544
var ok bool
540545
readyGeneration, ok = c.ready.checkAndReadGeneration()
541546
if !ok {
542547
return nil, errors.NewTooManyRequests("storage is (re)initializing", 1)
543548
}
544549
} else {
550+
fmt.Printf("#### 1b groupResource=%v\n", c.groupResource)
545551
readyGeneration, err = c.ready.waitAndReadGeneration(ctx)
546552
if err != nil {
547553
return nil, errors.NewServiceUnavailable(err.Error())
548554
}
549555
}
556+
fmt.Printf("#### 1c groupResource=%v, originalReadyGeneration=%d\n", c.groupResource, readyGeneration)
550557

551558
// determine the namespace and name scope of the watch, first from the request, secondarily from the field selector
552559
scope := namespacedName{}
@@ -659,6 +666,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
659666
defer c.Unlock()
660667

661668
if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok {
669+
fmt.Printf("#### 1o groupResource=%v currentReadyGeneration=%d, originalReadyGeneration=%d, ok=%v\n", c.groupResource, generation, readyGeneration, ok)
662670
// We went unready or are already on a different generation.
663671
// Avoid registering and starting the watch as it will have to be
664672
// terminated immediately anyway.
@@ -680,6 +688,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
680688
}()
681689

682690
if !addedWatcher {
691+
fmt.Printf("#### 1x groupResource=%v returning the immediate closer thing\n", c.groupResource)
683692
// Watcher isn't really started at this point, so it's safe to just drop it.
684693
//
685694
// We're simulating the immediate watch termination, which boils down to simply

staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ func (r *ready) checkAndReadGeneration() (int, bool) {
117117
func (r *ready) set(ok bool) {
118118
r.lock.Lock()
119119
defer r.lock.Unlock()
120+
120121
if r.state == Stopped {
121122
return
122123
}

0 commit comments

Comments
 (0)