Skip to content

Commit b8d827c

Browse files
authored
Merge pull request #11757 from sbueringer/pr-poc-watch-timeout
🐛 ClusterCache: Increase timeout for informer List+Watch calls from 10s to 11m
2 parents 5c45fbd + f48063a commit b8d827c

File tree

16 files changed

+88
-16
lines changed

16 files changed

+88
-16
lines changed

controllers/clustercache/cluster_accessor.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ type clusterAccessor struct {
5454
// and health checking information (e.g. lastProbeSuccessTimestamp, consecutiveFailures).
5555
// lockedStateLock must be *always* held (via lock or rLock) before accessing this field.
5656
lockedState clusterAccessorLockedState
57+
58+
// cacheCtx is the ctx used when starting the cache.
59+
// This ctx can be used by the ClusterCache to stop the cache.
60+
cacheCtx context.Context //nolint:containedctx
5761
}
5862

5963
// clusterAccessorConfig is the config of the clusterAccessor.
@@ -211,10 +215,11 @@ type clusterAccessorLockedHealthCheckingState struct {
211215
}
212216

213217
// newClusterAccessor creates a new clusterAccessor.
214-
func newClusterAccessor(cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor {
218+
func newClusterAccessor(cacheCtx context.Context, cluster client.ObjectKey, clusterAccessorConfig *clusterAccessorConfig) *clusterAccessor {
215219
return &clusterAccessor{
216-
cluster: cluster,
217-
config: clusterAccessorConfig,
220+
cacheCtx: cacheCtx,
221+
cluster: cluster,
222+
config: clusterAccessorConfig,
218223
}
219224
}
220225

controllers/clustercache/cluster_accessor_client.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect
100100
}
101101

102102
log.V(6).Info("Creating cached client and cache")
103-
cachedClient, cache, err := createCachedClient(ctx, ca.config, restConfig, httpClient, mapper)
103+
cachedClient, cache, err := createCachedClient(ctx, ca.cacheCtx, ca.config, restConfig, httpClient, mapper)
104104
if err != nil {
105105
return nil, err
106106
}
@@ -212,23 +212,37 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien
212212
}
213213

214214
// createCachedClient creates a cached client for the given cluster, based on the rest.Config.
215-
func createCachedClient(ctx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) {
215+
func createCachedClient(ctx, cacheCtx context.Context, clusterAccessorConfig *clusterAccessorConfig, config *rest.Config, httpClient *http.Client, mapper meta.RESTMapper) (client.Client, *stoppableCache, error) {
216+
// This config will only be used for List and Watch calls of informers
217+
// because we don't want these requests to time out after the regular timeout
218+
// of Options.Client.Timeout (default 10s).
219+
// Lists of informers have no timeouts set.
220+
// Watches of informers are timing out per default after [5m, 2*5m].
221+
// https://github.com/kubernetes/client-go/blob/v0.32.0/tools/cache/reflector.go#L53-L55
222+
// We are setting 11m to set a timeout for List calls without influencing Watch calls.
223+
configWith11mTimeout := rest.CopyConfig(config)
224+
configWith11mTimeout.Timeout = 11 * time.Minute
225+
httpClientWith11mTimeout, err := rest.HTTPClientFor(configWith11mTimeout)
226+
if err != nil {
227+
return nil, nil, errors.Wrapf(err, "error creating cache: error creating HTTP client")
228+
}
229+
216230
// Create the cache for the cluster.
217231
cacheOptions := cache.Options{
218-
HTTPClient: httpClient,
232+
HTTPClient: httpClientWith11mTimeout,
219233
Scheme: clusterAccessorConfig.Scheme,
220234
Mapper: mapper,
221235
SyncPeriod: clusterAccessorConfig.Cache.SyncPeriod,
222236
ByObject: clusterAccessorConfig.Cache.ByObject,
223237
}
224-
remoteCache, err := cache.New(config, cacheOptions)
238+
remoteCache, err := cache.New(configWith11mTimeout, cacheOptions)
225239
if err != nil {
226240
return nil, nil, errors.Wrapf(err, "error creating cache")
227241
}
228242

229243
// Use a context that is independent of the passed in context, so the cache doesn't get stopped
230244
// when the passed in context is canceled.
231-
cacheCtx, cacheCtxCancel := context.WithCancel(context.Background())
245+
cacheCtx, cacheCtxCancel := context.WithCancel(cacheCtx)
232246

233247
// We need to be able to stop the cache's shared informers, so wrap this in a stoppableCache.
234248
cache := &stoppableCache{

controllers/clustercache/cluster_accessor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestConnect(t *testing.T) {
6666
Indexes: []CacheOptionsIndex{NodeProviderIDIndex},
6767
},
6868
}, nil)
69-
accessor := newClusterAccessor(clusterKey, config)
69+
accessor := newClusterAccessor(context.Background(), clusterKey, config)
7070

7171
// Connect when kubeconfig Secret doesn't exist (should fail)
7272
err := accessor.Connect(ctx)
@@ -164,7 +164,7 @@ func TestDisconnect(t *testing.T) {
164164
Timeout: 10 * time.Second,
165165
},
166166
}, nil)
167-
accessor := newClusterAccessor(clusterKey, config)
167+
accessor := newClusterAccessor(context.Background(), clusterKey, config)
168168

169169
// Connect (so we can disconnect afterward)
170170
g.Expect(accessor.Connect(ctx)).To(Succeed())
@@ -271,7 +271,7 @@ func TestHealthCheck(t *testing.T) {
271271
t.Run(tt.name, func(t *testing.T) {
272272
g := NewWithT(t)
273273

274-
accessor := newClusterAccessor(clusterKey, &clusterAccessorConfig{
274+
accessor := newClusterAccessor(context.Background(), clusterKey, &clusterAccessorConfig{
275275
HealthProbe: &clusterAccessorHealthProbeConfig{
276276
Timeout: 5 * time.Second,
277277
FailureThreshold: 5,
@@ -324,7 +324,7 @@ func TestWatch(t *testing.T) {
324324
Timeout: 10 * time.Second,
325325
},
326326
}, nil)
327-
accessor := newClusterAccessor(clusterKey, config)
327+
accessor := newClusterAccessor(context.Background(), clusterKey, config)
328328

329329
tw := &testWatcher{}
330330
wi := WatcherOptions{

controllers/clustercache/cluster_cache.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,14 @@ func SetupWithManager(ctx context.Context, mgr manager.Manager, options Options,
295295
log.Info("Couldn't find controller Pod metadata, the ClusterCache will always access the cluster it is running on using the regular apiserver endpoint")
296296
}
297297

298+
cacheCtx, cacheCtxCancel := context.WithCancelCause(context.Background())
299+
298300
cc := &clusterCache{
299301
client: mgr.GetClient(),
300302
clusterAccessorConfig: buildClusterAccessorConfig(mgr.GetScheme(), options, controllerPodMetadata),
301303
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
304+
cacheCtx: cacheCtx,
305+
cacheCtxCancel: cacheCtxCancel,
302306
}
303307

304308
err := ctrl.NewControllerManagedBy(mgr).
@@ -331,6 +335,12 @@ type clusterCache struct {
331335
// This information is necessary so we can enqueue reconcile.Requests for reconcilers that
332336
// got a cluster source via GetClusterSource.
333337
clusterSources []clusterSource
338+
339+
// cacheCtx is passed to clusterAccessors to be used when starting caches.
340+
cacheCtx context.Context //nolint:containedctx
341+
342+
// cacheCtxCancel is used during Shutdown to stop caches.
343+
cacheCtxCancel context.CancelCauseFunc
334344
}
335345

336346
// clusterSource stores the necessary information so we can enqueue reconcile.Requests for reconcilers that
@@ -523,7 +533,7 @@ func (cc *clusterCache) getOrCreateClusterAccessor(cluster client.ObjectKey) *cl
523533

524534
accessor, ok := cc.clusterAccessors[cluster]
525535
if !ok {
526-
accessor = newClusterAccessor(cluster, cc.clusterAccessorConfig)
536+
accessor = newClusterAccessor(cc.cacheCtx, cluster, cc.clusterAccessorConfig)
527537
cc.clusterAccessors[cluster] = accessor
528538
}
529539

@@ -656,6 +666,13 @@ func (cc *clusterCache) SetConnectionCreationRetryInterval(interval time.Duratio
656666
cc.clusterAccessorConfig.ConnectionCreationRetryInterval = interval
657667
}
658668

669+
// Shutdown can be used to shut down the ClusterCache in unit tests.
670+
// This method should only be used for tests because it hasn't been designed for production usage
671+
// in a manager (race conditions with manager shutdown etc.).
672+
func (cc *clusterCache) Shutdown() {
673+
cc.cacheCtxCancel(errors.New("ClusterCache is shutdown"))
674+
}
675+
659676
func validateAndDefaultOptions(opts *Options) error {
660677
if opts.SecretClient == nil {
661678
return errors.New("options.SecretClient must be set")

controllers/clustercache/cluster_cache_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func TestReconcile(t *testing.T) {
7777
client: env.Manager.GetAPIReader(),
7878
clusterAccessorConfig: accessorConfig,
7979
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
80+
cacheCtx: context.Background(),
8081
}
8182

8283
// Add a Cluster source and start it (queue will be later used to verify the source works correctly)
@@ -537,6 +538,7 @@ func TestClusterCacheConcurrency(t *testing.T) {
537538
g.Expect(err).ToNot(HaveOccurred())
538539
internalClusterCache, ok := cc.(*clusterCache)
539540
g.Expect(ok).To(BeTrue())
541+
defer internalClusterCache.Shutdown()
540542

541543
// Generate test clusters.
542544
testClusters := generateTestClusters(clusterCount, brokenClusterPercentage)

controlplane/kubeadm/internal/cluster_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ func TestGetWorkloadCluster(t *testing.T) {
225225
},
226226
}, controller.Options{MaxConcurrentReconciles: 10, SkipNameValidation: ptr.To(true)})
227227
g.Expect(err).ToNot(HaveOccurred())
228+
defer clusterCache.(interface{ Shutdown() }).Shutdown()
228229

229230
m := Management{
230231
Client: env.GetClient(),

exp/addons/internal/controllers/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ func TestMain(m *testing.M) {
100100
if err != nil {
101101
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
102102
}
103+
go func() {
104+
<-ctx.Done()
105+
clusterCache.(interface{ Shutdown() }).Shutdown()
106+
}()
103107

104108
reconciler := ClusterResourceSetReconciler{
105109
Client: mgr.GetClient(),

exp/internal/controllers/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ func TestMain(m *testing.M) {
6565
if err != nil {
6666
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
6767
}
68+
go func() {
69+
<-ctx.Done()
70+
clusterCache.(interface{ Shutdown() }).Shutdown()
71+
}()
6872

6973
if err := (&MachinePoolReconciler{
7074
Client: mgr.GetClient(),

internal/controllers/cluster/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ func TestMain(m *testing.M) {
8383
if err != nil {
8484
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
8585
}
86+
go func() {
87+
<-ctx.Done()
88+
clusterCache.(interface{ Shutdown() }).Shutdown()
89+
}()
8690

8791
// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
8892
// only retried every 30s. If we get unlucky tests are then failing with timeout.

internal/controllers/machine/machine_controller_noderef_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ func TestGetNode(t *testing.T) {
341341
if err != nil {
342342
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
343343
}
344+
defer clusterCache.(interface{ Shutdown() }).Shutdown()
344345

345346
r := &Reconciler{
346347
ClusterCache: clusterCache,

internal/controllers/machine/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ func TestMain(m *testing.M) {
8383
if err != nil {
8484
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
8585
}
86+
go func() {
87+
<-ctx.Done()
88+
clusterCache.(interface{ Shutdown() }).Shutdown()
89+
}()
8690

8791
// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
8892
// only retried every 30s. If we get unlucky tests are then failing with timeout.

internal/controllers/machinedeployment/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ func TestMain(m *testing.M) {
8989
if err != nil {
9090
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
9191
}
92+
go func() {
93+
<-ctx.Done()
94+
clusterCache.(interface{ Shutdown() }).Shutdown()
95+
}()
9296

9397
if err := (&machinecontroller.Reconciler{
9498
Client: mgr.GetClient(),

internal/controllers/machinehealthcheck/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ func TestMain(m *testing.M) {
7575
if err != nil {
7676
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
7777
}
78+
go func() {
79+
<-ctx.Done()
80+
clusterCache.(interface{ Shutdown() }).Shutdown()
81+
}()
7882

7983
// Setting ConnectionCreationRetryInterval to 2 seconds, otherwise client creation is
8084
// only retried every 30s. If we get unlucky tests are then failing with timeout.

internal/controllers/machineset/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ func TestMain(m *testing.M) {
8888
if err != nil {
8989
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
9090
}
91+
go func() {
92+
<-ctx.Done()
93+
clusterCache.(interface{ Shutdown() }).Shutdown()
94+
}()
9195

9296
if err := (&Reconciler{
9397
Client: mgr.GetClient(),

internal/controllers/topology/cluster/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ func TestMain(m *testing.M) {
8686
if err != nil {
8787
panic(fmt.Sprintf("Failed to create ClusterCache: %v", err))
8888
}
89+
go func() {
90+
<-ctx.Done()
91+
clusterCache.(interface{ Shutdown() }).Shutdown()
92+
}()
8993

9094
if err := (&Reconciler{
9195
Client: mgr.GetClient(),

internal/test/envtest/environment.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ func Run(ctx context.Context, input RunInput) int {
151151
// Bootstrapping test environment
152152
env := newEnvironment(input.ManagerCacheOptions, input.ManagerUncachedObjs...)
153153

154+
ctx, cancel := context.WithCancel(ctx)
155+
env.cancelManager = cancel
156+
154157
if input.SetupIndexes != nil {
155158
input.SetupIndexes(ctx, env.Manager)
156159
}
@@ -385,9 +388,6 @@ func newEnvironment(managerCacheOptions cache.Options, uncachedObjs ...client.Ob
385388

386389
// start starts the manager.
387390
func (e *Environment) start(ctx context.Context) {
388-
ctx, cancel := context.WithCancel(ctx)
389-
e.cancelManager = cancel
390-
391391
go func() {
392392
fmt.Println("Starting the test environment manager")
393393
if err := e.Manager.Start(ctx); err != nil {

0 commit comments

Comments
 (0)