From 324c320b1253fb78f7288d560a52030ef1f8415f Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 15 Jan 2025 14:26:39 -0500 Subject: [PATCH 1/3] DO NOT MERGE: add instrumenting to chase very short watches --- .../src/k8s.io/apiserver/pkg/storage/cacher/cacher.go | 9 +++++++++ staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go | 1 + 2 files changed, 10 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 48791bd7b63b9..fb8f5922a9131 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -450,6 +450,8 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { + fmt.Printf("#### 2a groupResource=%v startCaching\n", c.groupResource) + // The 'usable' lock is always 'RLock'able when it is safe to use the cache. // It is safe to use the cache after a successful list until a disconnection. // We start with usable (write) locked. The below OnReplace function will @@ -465,6 +467,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { }) defer func() { if successfulList { + fmt.Printf("#### 2b groupResource=%v setting to false\n", c.groupResource) c.ready.set(false) } }() @@ -477,6 +480,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { if err := c.reflector.ListAndWatch(stopChannel); err != nil { klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err) } + fmt.Printf("#### 2e groupResource=%v exiting\n", c.groupResource) } // Versioner implements storage.Interface. @@ -536,17 +540,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions var readyGeneration int if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + fmt.Printf("#### 1a groupResource=%v\n", c.groupResource) var ok bool readyGeneration, ok = c.ready.checkAndReadGeneration() if !ok { return nil, errors.NewTooManyRequests("storage is (re)initializing", 1) } } else { + fmt.Printf("#### 1b groupResource=%v\n", c.groupResource) readyGeneration, err = c.ready.waitAndReadGeneration(ctx) if err != nil { return nil, errors.NewServiceUnavailable(err.Error()) } } + fmt.Printf("#### 1c groupResource=%v, originalReadyGeneration=%d\n", c.groupResource, readyGeneration) // determine the namespace and name scope of the watch, first from the request, secondarily from the field selector scope := namespacedName{} @@ -659,6 +666,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions defer c.Unlock() if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok { + fmt.Printf("#### 1o groupResource=%v currentReadyGeneration=%d, originalReadyGeneration=%d, ok=%v\n", c.groupResource, generation, readyGeneration, ok) // We went unready or are already on a different generation. // Avoid registering and starting the watch as it will have to be // terminated immediately anyway. @@ -680,6 +688,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions }() if !addedWatcher { + fmt.Printf("#### 1x groupResource=%v returning the immediate closer thing\n", c.groupResource) // Watcher isn't really started at this point, so it's safe to just drop it. // // We're simulating the immediate watch termination, which boils down to simply diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go index 012d6d585c9f1..10b9fd0e27d1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -117,6 +117,7 @@ func (r *ready) checkAndReadGeneration() (int, bool) { func (r *ready) set(ok bool) { r.lock.Lock() defer r.lock.Unlock() + if r.state == Stopped { return } From 767dc1953cd338f0e80bb63d2c54dcf4a2dacf4e Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 15 Jan 2025 17:47:04 -0500 Subject: [PATCH 2/3] more --- .../apiserver/pkg/storage/cacher/cache_watcher.go | 7 +++++++ .../k8s.io/apiserver/pkg/storage/cacher/cacher.go | 12 ++++++++++++ 2 files changed, 19 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index e07d00d310418..9883136dbae10 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -124,15 +124,20 @@ func (c *cacheWatcher) Stop() { // we rely on the fact that stopLocked is actually protected by Cacher.Lock() func (c *cacheWatcher) stopLocked() { + fmt.Printf("#### 4a groupResource=%v \n", c.groupResource) if !c.stopped { + fmt.Printf("#### 4b groupResource=%v \n", c.groupResource) c.stopped = true // stop without draining the input channel was requested. if !c.drainInputBuffer { + fmt.Printf("#### 4c groupResource=%v \n", c.groupResource) close(c.done) } + fmt.Printf("#### 4d groupResource=%v \n", c.groupResource) close(c.input) } + fmt.Printf("#### 4e groupResource=%v \n", c.groupResource) // Even if the watcher was already stopped, if it previously was // using draining mode and it's not using it now we need to // close the done channel now. Otherwise we could leak the @@ -140,8 +145,10 @@ func (c *cacheWatcher) stopLocked() { // into result channel, the channel will be full and there will // already be noone on the processing the events on the receiving end. if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() { + fmt.Printf("#### 4f groupResource=%v \n", c.groupResource) close(c.done) } + fmt.Printf("#### 4g groupResource=%v \n", c.groupResource) } func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index fb8f5922a9131..b6b6897daa5a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -661,6 +661,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions } addedWatcher := false + fmt.Printf("#### 1n groupResource=%v \n", c.groupResource) func() { c.Lock() defer c.Unlock() @@ -673,6 +674,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return } + fmt.Printf("#### 1p groupResource=%v \n", c.groupResource) // Update watcher.forget function once we can compute it. watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported) // Update the bookMarkAfterResourceVersion @@ -680,12 +682,16 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported) addedWatcher = true + fmt.Printf("#### 1q groupResource=%v \n", c.groupResource) // Add it to the queue only when the client support watch bookmarks. if watcher.allowWatchBookmarks { c.bookmarkWatchers.addWatcherThreadUnsafe(watcher) } c.watcherIdx++ + + fmt.Printf("#### 1r groupResource=%v \n", c.groupResource) }() + fmt.Printf("#### 1s groupResource=%v \n", c.groupResource) if !addedWatcher { fmt.Printf("#### 1x groupResource=%v returning the immediate closer thing\n", c.groupResource) @@ -696,6 +702,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newImmediateCloseWatcher(), nil } + fmt.Printf("#### 1y groupResource=%v \n", c.groupResource) go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion) return watcher, nil } @@ -1342,13 +1349,18 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, c.Lock() defer c.Unlock() + fmt.Printf("#### 3a groupResource=%v \n", c.groupResource) + w.setDrainInputBufferLocked(drainWatcher) + fmt.Printf("#### 3b groupResource=%v \n", c.groupResource) // It's possible that the watcher is already not in the structure (e.g. in case of // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() // on a watcher multiple times. c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported) + fmt.Printf("#### 3c groupResource=%v \n", c.groupResource) c.stopWatcherLocked(w) + fmt.Printf("#### 3d groupResource=%v \n", c.groupResource) } } From 402072d687b278b4dfc55a4e137437addff61d84 Mon Sep 17 00:00:00 2001 From: David Eads Date: Wed, 15 Jan 2025 18:05:36 -0500 Subject: [PATCH 3/3] print stack on stop --- .../src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index 9883136dbae10..899a5692199c0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -19,6 +19,8 @@ package cacher import ( "context" "fmt" + "os" + "runtime/debug" "sync" "time" @@ -119,6 +121,9 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event { // Implements watch.Interface. func (c *cacheWatcher) Stop() { + fmt.Fprintf(os.Stderr, "#### 6a groupResource=%v stop stack next\n", c.groupResource) + debug.PrintStack() + fmt.Fprintf(os.Stderr, "#### 6b groupResource=%v \n", c.groupResource) c.forget(false) }