Skip to content

Commit 37a60e5

Browse files
sbueringerFlorian Gutmann
and
Florian Gutmann
committed
ClusterCacheTracker: non-blocking per-cluster locking
Co-authored-by: Florian Gutmann <[email protected]>
1 parent ebbd333 commit 37a60e5

File tree

5 files changed

+247
-29
lines changed

5 files changed

+247
-29
lines changed

controllers/remote/cluster_cache_tracker.go

Lines changed: 74 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const (
5454
healthCheckPollInterval = 10 * time.Second
5555
healthCheckRequestTimeout = 5 * time.Second
5656
healthCheckUnhealthyThreshold = 10
57+
initialCacheSyncTimeout = 5 * time.Minute
5758
clusterCacheControllerName = "cluster-cache-tracker"
5859
)
5960

@@ -64,9 +65,15 @@ type ClusterCacheTracker struct {
6465
client client.Client
6566
scheme *runtime.Scheme
6667

67-
lock sync.RWMutex
68+
// clusterAccessorsLock is used to lock the access to the clusterAccessors map.
69+
clusterAccessorsLock sync.RWMutex
70+
// clusterAccessors is the map of clusterAccessor by cluster.
6871
clusterAccessors map[client.ObjectKey]*clusterAccessor
69-
indexes []Index
72+
// clusterLock is a per-cluster lock used whenever we lock per-cluster actions
73+
// like creating a client or adding watches.
74+
clusterLock *keyedMutex
75+
76+
indexes []Index
7077

7178
// controllerPodMetadata is the Pod metadata of the controller using this ClusterCacheTracker.
7279
// This is only set when the POD_NAMESPACE, POD_NAME and POD_UID environment variables are set.
@@ -129,16 +136,14 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt
129136
client: manager.GetClient(),
130137
scheme: manager.GetScheme(),
131138
clusterAccessors: make(map[client.ObjectKey]*clusterAccessor),
139+
clusterLock: newKeyedMutex(),
132140
indexes: options.Indexes,
133141
}, nil
134142
}
135143

136144
// GetClient returns a cached client for the given cluster.
137145
func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) {
138-
t.lock.Lock()
139-
defer t.lock.Unlock()
140-
141-
accessor, err := t.getClusterAccessorLH(ctx, cluster, t.indexes...)
146+
accessor, err := t.getClusterAccessor(ctx, cluster, t.indexes...)
142147
if err != nil {
143148
return nil, err
144149
}
@@ -148,10 +153,7 @@ func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.Obje
148153

149154
// GetRESTConfig returns a cached REST config for the given cluster.
150155
func (t *ClusterCacheTracker) GetRESTConfig(ctc context.Context, cluster client.ObjectKey) (*rest.Config, error) {
151-
t.lock.Lock()
152-
defer t.lock.Unlock()
153-
154-
accessor, err := t.getClusterAccessorLH(ctc, cluster, t.indexes...)
156+
accessor, err := t.getClusterAccessor(ctc, cluster, t.indexes...)
155157
if err != nil {
156158
return nil, err
157159
}
@@ -169,28 +171,63 @@ type clusterAccessor struct {
169171

170172
// clusterAccessorExists returns true if a clusterAccessor exists for cluster.
171173
func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bool {
172-
t.lock.RLock()
173-
defer t.lock.RUnlock()
174+
t.clusterAccessorsLock.RLock()
175+
defer t.clusterAccessorsLock.RUnlock()
174176

175177
_, exists := t.clusterAccessors[cluster]
176178
return exists
177179
}
178180

179-
// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a
180-
// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held).
181-
func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
182-
a := t.clusterAccessors[cluster]
181+
// getClusterAccessor returns a clusterAccessor for cluster.
182+
// It first tries to return an already-created clusterAccessor.
183+
// It then falls back to create a new clusterAccessor if needed.
184+
// If there is already another go routine trying to create a clusterAccessor
185+
// for the same cluster, an error is returned.
186+
func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
187+
log := ctrl.LoggerFrom(ctx, "cluster", klog.KRef(cluster.Namespace, cluster.Name))
188+
189+
loadExistingAccessor := func() *clusterAccessor {
190+
t.clusterAccessorsLock.RLock()
191+
defer t.clusterAccessorsLock.RUnlock()
192+
return t.clusterAccessors[cluster]
193+
}
194+
storeAccessor := func(a *clusterAccessor) {
195+
t.clusterAccessorsLock.Lock()
196+
defer t.clusterAccessorsLock.Unlock()
197+
t.clusterAccessors[cluster] = a
198+
}
199+
200+
// If the clusterAccessor already exists, return early.
201+
a := loadExistingAccessor()
183202
if a != nil {
184203
return a, nil
185204
}
186205

187-
a, err := t.newClusterAccessor(ctx, cluster, indexes...)
188-
if err != nil {
189-
return nil, errors.Wrap(err, "error creating client and cache for remote cluster")
206+
// clusterAccessor doesn't exist yet, we might have to initialize one.
207+
// Lock on the cluster to ensure only one clusterAccessor is initialized
208+
// for the cluster at the same time.
209+
// Return an error if another go routine already tries to create a clusterAccessor.
210+
unlockCluster, ok := t.clusterLock.TryLock(cluster)
211+
if !ok {
212+
return nil, errors.Errorf("error creating new cluster accessor: another go routine is already trying to create the cluster accessor for this cluster")
190213
}
214+
defer unlockCluster()
191215

192-
t.clusterAccessors[cluster] = a
216+
// Until we got the cluster lock a different goroutine might have initialized the clusterAccessor
217+
// for this cluster successfully already. If this is the case we return it.
218+
a = loadExistingAccessor()
219+
if a != nil {
220+
return a, nil
221+
}
193222

223+
// We are the go routine who has to initialize the clusterAccessor.
224+
log.V(4).Info("Creating new cluster accessor")
225+
a, err := t.newClusterAccessor(ctx, cluster, indexes...)
226+
if err != nil {
227+
return nil, errors.Wrap(err, "error creating new cluster accessor")
228+
}
229+
log.V(4).Info("Storing new cluster accessor")
230+
storeAccessor(a)
194231
return a, nil
195232
}
196233

@@ -265,7 +302,12 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
265302

266303
// Start the cache!!!
267304
go cache.Start(cacheCtx) //nolint:errcheck
268-
if !cache.WaitForCacheSync(cacheCtx) {
305+
306+
// Wait until the cache is initially synced
307+
cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout)
308+
defer cacheSyncCtxCancel()
309+
if !cache.WaitForCacheSync(cacheSyncCtx) {
310+
cache.Stop()
269311
return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
270312
}
271313

@@ -337,8 +379,8 @@ func (t *ClusterCacheTracker) createClient(config *rest.Config, cluster client.O
337379

338380
// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
339381
func (t *ClusterCacheTracker) deleteAccessor(_ context.Context, cluster client.ObjectKey) {
340-
t.lock.Lock()
341-
defer t.lock.Unlock()
382+
t.clusterAccessorsLock.Lock()
383+
defer t.clusterAccessorsLock.Unlock()
342384

343385
a, exists := t.clusterAccessors[cluster]
344386
if !exists {
@@ -387,14 +429,18 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
387429
return errors.New("input.Name is required")
388430
}
389431

390-
t.lock.Lock()
391-
defer t.lock.Unlock()
392-
393-
a, err := t.getClusterAccessorLH(ctx, input.Cluster, t.indexes...)
432+
a, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...)
394433
if err != nil {
395434
return err
396435
}
397436

437+
// We have to lock the cluster, so that the watch is not created multiple times in parallel.
438+
unlock, ok := t.clusterLock.TryLock(input.Cluster)
439+
if !ok {
440+
return errors.Errorf("failed to add watch: another go routine is already trying to create the cluster accessor")
441+
}
442+
defer unlock()
443+
398444
if a.watches.Has(input.Name) {
399445
t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
400446
return nil
@@ -505,7 +551,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
505551
// An error returned implies the health check has failed a sufficient number of
506552
// times for the cluster to be considered unhealthy
507553
// NB. we are ignoring ErrWaitTimeout because this error happens when the channel is close, that in this case
508-
// happens when the cache is explicitly stopped.F
554+
// happens when the cache is explicitly stopped.
509555
if err != nil && err != wait.ErrWaitTimeout {
510556
t.log.Error(err, "Error health checking cluster", "Cluster", klog.KRef(in.cluster.Namespace, in.cluster.Name))
511557
t.deleteAccessor(ctx, in.cluster)

controllers/remote/keyedmutex.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package remote
18+
19+
import "sync"
20+
21+
// keyedMutex is a mutex locking on the key provided to the Lock function.
22+
// Only one caller can hold the lock for a specific key at a time.
23+
// A second Lock call if the lock is already held for a key returns false.
24+
type keyedMutex struct {
25+
locksMtx sync.Mutex
26+
locks map[interface{}]*sync.Mutex
27+
}
28+
29+
// newKeyedMutex creates a new keyed mutex ready for use.
30+
func newKeyedMutex() *keyedMutex {
31+
return &keyedMutex{
32+
locks: make(map[interface{}]*sync.Mutex),
33+
}
34+
}
35+
36+
// unlock unlocks a currently locked key.
37+
type unlock func()
38+
39+
// TryLock locks the passed in key if it's not already locked.
40+
// Returns the unlock function to release the lock on the key.
41+
// A second Lock call if the lock is already held for a key returns false.
42+
// In the ClusterCacheTracker case the key is the ObjectKey for a cluster.
43+
func (k *keyedMutex) TryLock(key interface{}) (unlock, bool) {
44+
// Get the lock if it doesn't exist already.
45+
// If it does exist, return false.
46+
l, ok := func() (*sync.Mutex, bool) {
47+
k.locksMtx.Lock()
48+
defer k.locksMtx.Unlock()
49+
50+
l, ok := k.locks[key]
51+
if !ok {
52+
// Lock doesn't exist yet, create one and return it.
53+
l = &sync.Mutex{}
54+
k.locks[key] = l
55+
return l, true
56+
}
57+
58+
// Lock already exists, return false.
59+
return nil, false
60+
}()
61+
62+
// Return false if another go routine already holds the lock for this key (e.g. Cluster).
63+
if !ok {
64+
return nil, false
65+
}
66+
67+
// Lock for the current key (e.g. Cluster).
68+
l.Lock()
69+
70+
// Unlock the key (e.g. Cluster) and remove it from the lock map.
71+
return func() {
72+
k.locksMtx.Lock()
73+
defer k.locksMtx.Unlock()
74+
75+
l.Unlock()
76+
delete(k.locks, key)
77+
}, true
78+
}

controllers/remote/keyedmutex_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package remote
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/gomega"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
25+
)
26+
27+
func TestKeyedMutex(t *testing.T) {
28+
t.Run("Lock a Cluster and ensures the second Lock on the same Cluster returns false", func(t *testing.T) {
29+
t.Parallel()
30+
g := NewWithT(t)
31+
32+
cluster1 := client.ObjectKey{Namespace: metav1.NamespaceDefault, Name: "Cluster1"}
33+
km := newKeyedMutex()
34+
35+
// Try to lock cluster1.
36+
// Should work as nobody currently holds the lock for cluster1.
37+
unlock, ok := km.TryLock(cluster1)
38+
g.Expect(ok).To(BeTrue())
39+
40+
// Try to lock cluster1 again.
41+
// Shouldn't work as cluster1 is already locked.
42+
_, ok = km.TryLock(cluster1)
43+
g.Expect(ok).To(BeFalse())
44+
45+
// Unlock cluster1.
46+
unlock()
47+
48+
// Ensure that the lock was cleaned up from the internal map.
49+
g.Expect(km.locks).To(HaveLen(0))
50+
})
51+
52+
t.Run("Can lock different Clusters in parallel but each one only once", func(t *testing.T) {
53+
g := NewWithT(t)
54+
km := newKeyedMutex()
55+
clusters := []client.ObjectKey{
56+
{Namespace: metav1.NamespaceDefault, Name: "Cluster1"},
57+
{Namespace: metav1.NamespaceDefault, Name: "Cluster2"},
58+
{Namespace: metav1.NamespaceDefault, Name: "Cluster3"},
59+
{Namespace: metav1.NamespaceDefault, Name: "Cluster4"},
60+
}
61+
62+
// Run this twice to ensure Clusters can be locked again
63+
// after they have been unlocked.
64+
for i := 0; i < 2; i++ {
65+
unlocks := make([]unlock, 0, len(clusters))
66+
67+
// Lock all Clusters (should work).
68+
for _, key := range clusters {
69+
unlock, ok := km.TryLock(key)
70+
g.Expect(ok).To(BeTrue())
71+
unlocks = append(unlocks, unlock)
72+
}
73+
74+
// Ensure Clusters can't be locked again.
75+
for _, key := range clusters {
76+
_, ok := km.TryLock(key)
77+
g.Expect(ok).To(BeFalse())
78+
}
79+
80+
// Unlock all Clusters.
81+
for _, unlock := range unlocks {
82+
unlock()
83+
}
84+
}
85+
86+
// Ensure that the lock was cleaned up from the internal map.
87+
g.Expect(km.locks).To(HaveLen(0))
88+
})
89+
}

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ require (
111111
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
112112
github.com/prometheus/client_golang v1.13.0
113113
github.com/prometheus/client_model v0.2.0 // indirect
114-
github.com/prometheus/common v0.37.0 // indirect
114+
github.com/prometheus/common v0.37.0
115115
github.com/prometheus/procfs v0.8.0 // indirect
116116
github.com/rivo/uniseg v0.4.2 // indirect
117117
github.com/russross/blackfriday v1.5.2 // indirect
@@ -150,6 +150,8 @@ require (
150150
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
151151
)
152152

153+
require github.com/stretchr/testify v1.8.0
154+
153155
require (
154156
cloud.google.com/go/compute v1.7.0 // indirect
155157
github.com/blang/semver/v4 v4.0.0 // indirect
@@ -158,4 +160,5 @@ require (
158160
github.com/google/gnostic v0.6.9 // indirect
159161
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
160162
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
163+
github.com/pmezard/go-difflib v1.0.0 // indirect
161164
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht
394394
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
395395
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
396396
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
397+
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
397398
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
398399
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
399400
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -478,6 +479,7 @@ github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00/go.mod
478479
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
479480
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
480481
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
482+
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU=
481483
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
482484
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
483485
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=

0 commit comments

Comments
 (0)