Skip to content

Commit 45f007c

Browse files
committed
review fixes
1 parent 1d6c718 commit 45f007c

File tree

4 files changed

+85
-83
lines changed

4 files changed

+85
-83
lines changed

controllers/remote/cluster_cache_healthcheck_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
140140
})
141141

142142
// Make sure this passes for at least for some seconds, to give the health check goroutine time to run.
143-
g.Consistently(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeTrue())
143+
g.Consistently(func() bool {
144+
_, ok := cct.loadAccessor(testClusterKey)
145+
return ok
146+
}, 5*time.Second, 1*time.Second).Should(BeTrue())
144147
})
145148

146149
t.Run("with an invalid path", func(t *testing.T) {
@@ -162,7 +165,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
162165
})
163166

164167
// This should succeed after N consecutive failed requests.
165-
g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse())
168+
g.Eventually(func() bool {
169+
_, ok := cct.loadAccessor(testClusterKey)
170+
return ok
171+
}, 5*time.Second, 1*time.Second).Should(BeFalse())
166172
})
167173

168174
t.Run("with an invalid config", func(t *testing.T) {
@@ -193,7 +199,10 @@ func TestClusterCacheHealthCheck(t *testing.T) {
193199
})
194200

195201
// This should succeed after N consecutive failed requests.
196-
g.Eventually(func() bool { return cct.clusterAccessorExists(testClusterKey) }, 5*time.Second, 1*time.Second).Should(BeFalse())
202+
g.Eventually(func() bool {
203+
_, ok := cct.loadAccessor(testClusterKey)
204+
return ok
205+
}, 5*time.Second, 1*time.Second).Should(BeFalse())
197206
})
198207
})
199208
}

controllers/remote/cluster_cache_tracker.go

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ const (
5858
clusterCacheControllerName = "cluster-cache-tracker"
5959
)
6060

61+
// ErrClusterLocked is returned in methods that require cluster-level locking
62+
// if the cluster is already locked by another concurrent call.
63+
var ErrClusterLocked = errors.New("cluster is locked already")
64+
6165
// ClusterCacheTracker manages client caches for workload clusters.
6266
type ClusterCacheTracker struct {
6367
log logr.Logger
@@ -67,10 +71,10 @@ type ClusterCacheTracker struct {
6771

6872
// clusterAccessorsLock is used to lock the access to the clusterAccessors map.
6973
clusterAccessorsLock sync.RWMutex
70-
// clusterAccessors is the map of clusterAccessor by cluster.
74+
// clusterAccessors is the map of clusterAccessors by cluster.
7175
clusterAccessors map[client.ObjectKey]*clusterAccessor
72-
// clusterLock is a per-cluster lock used whenever we lock per-cluster actions
73-
// like creating a client or adding watches.
76+
// clusterLock is a per-cluster lock used whenever we're locking for a specific cluster.
77+
// E.g. for actions like creating a client or adding watches.
7478
clusterLock *keyedMutex
7579

7680
indexes []Index
@@ -178,6 +182,23 @@ func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bo
178182
return exists
179183
}
180184

185+
// loadAccessor loads a clusterAccessor.
186+
func (t *ClusterCacheTracker) loadAccessor(cluster client.ObjectKey) (*clusterAccessor, bool) {
187+
t.clusterAccessorsLock.RLock()
188+
defer t.clusterAccessorsLock.RUnlock()
189+
190+
accessor, ok := t.clusterAccessors[cluster]
191+
return accessor, ok
192+
}
193+
194+
// storeAccessor stores a clusterAccessor.
195+
func (t *ClusterCacheTracker) storeAccessor(cluster client.ObjectKey, accessor *clusterAccessor) {
196+
t.clusterAccessorsLock.Lock()
197+
defer t.clusterAccessorsLock.Unlock()
198+
199+
t.clusterAccessors[cluster] = accessor
200+
}
201+
181202
// getClusterAccessor returns a clusterAccessor for cluster.
182203
// It first tries to return an already-created clusterAccessor.
183204
// It then falls back to create a new clusterAccessor if needed.
@@ -186,49 +207,36 @@ func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bo
186207
func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) {
187208
log := ctrl.LoggerFrom(ctx, "cluster", klog.KRef(cluster.Namespace, cluster.Name))
188209

189-
loadExistingAccessor := func() *clusterAccessor {
190-
t.clusterAccessorsLock.RLock()
191-
defer t.clusterAccessorsLock.RUnlock()
192-
return t.clusterAccessors[cluster]
193-
}
194-
storeAccessor := func(a *clusterAccessor) {
195-
t.clusterAccessorsLock.Lock()
196-
defer t.clusterAccessorsLock.Unlock()
197-
t.clusterAccessors[cluster] = a
198-
}
199-
200210
// If the clusterAccessor already exists, return early.
201-
a := loadExistingAccessor()
202-
if a != nil {
203-
return a, nil
211+
if accessor, ok := t.loadAccessor(cluster); ok {
212+
return accessor, nil
204213
}
205214

206215
// clusterAccessor doesn't exist yet, we might have to initialize one.
207216
// Lock on the cluster to ensure only one clusterAccessor is initialized
208217
// for the cluster at the same time.
209218
// Return an error if another go routine already tries to create a clusterAccessor.
210-
unlockCluster, ok := t.clusterLock.TryLock(cluster)
211-
if !ok {
212-
return nil, errors.Errorf("error creating new cluster accessor: another go routine is already trying to create the cluster accessor for this cluster")
219+
if ok := t.clusterLock.TryLock(cluster); !ok {
220+
return nil, errors.Wrapf(ErrClusterLocked, "failed to create cluster accessor: failed to get lock for cluster")
213221
}
214-
defer unlockCluster()
222+
defer t.clusterLock.Unlock(cluster)
215223

216224
// Until we got the cluster lock a different goroutine might have initialized the clusterAccessor
217225
// for this cluster successfully already. If this is the case we return it.
218-
a = loadExistingAccessor()
219-
if a != nil {
220-
return a, nil
226+
if accessor, ok := t.loadAccessor(cluster); ok {
227+
return accessor, nil
221228
}
222229

223230
// We are the go routine who has to initialize the clusterAccessor.
224231
log.V(4).Info("Creating new cluster accessor")
225-
a, err := t.newClusterAccessor(ctx, cluster, indexes...)
232+
accessor, err := t.newClusterAccessor(ctx, cluster, indexes...)
226233
if err != nil {
227-
return nil, errors.Wrap(err, "error creating new cluster accessor")
234+
return nil, errors.Wrap(err, "failed to create cluster accessor")
228235
}
236+
229237
log.V(4).Info("Storing new cluster accessor")
230-
storeAccessor(a)
231-
return a, nil
238+
t.storeAccessor(cluster, accessor)
239+
return accessor, nil
232240
}
233241

234242
// newClusterAccessor creates a new clusterAccessor.
@@ -435,11 +443,11 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error
435443
}
436444

437445
// We have to lock the cluster, so that the watch is not created multiple times in parallel.
438-
unlock, ok := t.clusterLock.TryLock(input.Cluster)
446+
ok := t.clusterLock.TryLock(input.Cluster)
439447
if !ok {
440-
return errors.Errorf("failed to add watch: another go routine is already trying to create the cluster accessor")
448+
return errors.Wrapf(ErrClusterLocked, "failed to add watch: error getting lock for cluster")
441449
}
442-
defer unlock()
450+
defer t.clusterLock.Unlock(input.Cluster)
443451

444452
if a.watches.Has(input.Name) {
445453
t.log.V(6).Info("Watch already exists", "Cluster", klog.KRef(input.Cluster.Namespace, input.Cluster.Name), "name", input.Name)
@@ -518,7 +526,7 @@ func (t *ClusterCacheTracker) healthCheckCluster(ctx context.Context, in *health
518526
return false, nil
519527
}
520528

521-
if !t.clusterAccessorExists(in.cluster) {
529+
if _, ok := t.loadAccessor(in.cluster); !ok {
522530
// Cache for this cluster has already been cleaned up.
523531
// Nothing to do, so return true.
524532
return true, nil

controllers/remote/keyedmutex.go

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,63 +16,55 @@ limitations under the License.
1616

1717
package remote
1818

19-
import "sync"
19+
import (
20+
"sync"
21+
22+
"sigs.k8s.io/controller-runtime/pkg/client"
23+
)
2024

2125
// keyedMutex is a mutex locking on the key provided to the Lock function.
2226
// Only one caller can hold the lock for a specific key at a time.
2327
// A second Lock call if the lock is already held for a key returns false.
2428
type keyedMutex struct {
2529
locksMtx sync.Mutex
26-
locks map[interface{}]*sync.Mutex
30+
locks map[client.ObjectKey]*sync.Mutex
2731
}
2832

2933
// newKeyedMutex creates a new keyed mutex ready for use.
3034
func newKeyedMutex() *keyedMutex {
3135
return &keyedMutex{
32-
locks: make(map[interface{}]*sync.Mutex),
36+
locks: make(map[client.ObjectKey]*sync.Mutex),
3337
}
3438
}
3539

36-
// unlock unlocks a currently locked key.
37-
type unlock func()
38-
3940
// TryLock locks the passed in key if it's not already locked.
40-
// Returns the unlock function to release the lock on the key.
4141
// A second Lock call if the lock is already held for a key returns false.
4242
// In the ClusterCacheTracker case the key is the ObjectKey for a cluster.
43-
func (k *keyedMutex) TryLock(key interface{}) (unlock, bool) {
44-
// Get the lock if it doesn't exist already.
45-
// If it does exist, return false.
46-
l, ok := func() (*sync.Mutex, bool) {
47-
k.locksMtx.Lock()
48-
defer k.locksMtx.Unlock()
49-
50-
_, ok := k.locks[key]
51-
if !ok {
52-
// Lock doesn't exist yet, create one and return it.
53-
l := &sync.Mutex{}
54-
k.locks[key] = l
55-
return l, true
56-
}
57-
58-
// Lock already exists, return false.
59-
return nil, false
60-
}()
43+
func (k *keyedMutex) TryLock(key client.ObjectKey) bool {
44+
k.locksMtx.Lock()
45+
defer k.locksMtx.Unlock()
6146

62-
// Return false if another go routine already holds the lock for this key (e.g. Cluster).
63-
if !ok {
64-
return nil, false
47+
// Check if there is already a lock for this key (e.g. Cluster).
48+
if _, ok := k.locks[key]; ok {
49+
// There is already a lock, return false.
50+
return false
6551
}
6652

67-
// Lock for the current key (e.g. Cluster).
53+
// Lock doesn't exist yet, create and lock the lock.
54+
l := &sync.Mutex{}
55+
k.locks[key] = l
6856
l.Lock()
6957

70-
// Unlock the key (e.g. Cluster) and remove it from the lock map.
71-
return func() {
72-
k.locksMtx.Lock()
73-
defer k.locksMtx.Unlock()
58+
return true
59+
}
60+
61+
// Unlock unlocks the key.
62+
func (k *keyedMutex) Unlock(key client.ObjectKey) {
63+
k.locksMtx.Lock()
64+
defer k.locksMtx.Unlock()
7465

66+
if l, ok := k.locks[key]; ok {
7567
l.Unlock()
7668
delete(k.locks, key)
77-
}, true
69+
}
7870
}

controllers/remote/keyedmutex_test.go

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,14 @@ func TestKeyedMutex(t *testing.T) {
3434

3535
// Try to lock cluster1.
3636
// Should work as nobody currently holds the lock for cluster1.
37-
unlock, ok := km.TryLock(cluster1)
38-
g.Expect(ok).To(BeTrue())
37+
g.Expect(km.TryLock(cluster1)).To(BeTrue())
3938

4039
// Try to lock cluster1 again.
4140
// Shouldn't work as cluster1 is already locked.
42-
_, ok = km.TryLock(cluster1)
43-
g.Expect(ok).To(BeFalse())
41+
g.Expect(km.TryLock(cluster1)).To(BeFalse())
4442

4543
// Unlock cluster1.
46-
unlock()
44+
km.Unlock(cluster1)
4745

4846
// Ensure that the lock was cleaned up from the internal map.
4947
g.Expect(km.locks).To(HaveLen(0))
@@ -62,24 +60,19 @@ func TestKeyedMutex(t *testing.T) {
6260
// Run this twice to ensure Clusters can be locked again
6361
// after they have been unlocked.
6462
for i := 0; i < 2; i++ {
65-
unlocks := make([]unlock, 0, len(clusters))
66-
6763
// Lock all Clusters (should work).
6864
for _, key := range clusters {
69-
unlock, ok := km.TryLock(key)
70-
g.Expect(ok).To(BeTrue())
71-
unlocks = append(unlocks, unlock)
65+
g.Expect(km.TryLock(key)).To(BeTrue())
7266
}
7367

7468
// Ensure Clusters can't be locked again.
7569
for _, key := range clusters {
76-
_, ok := km.TryLock(key)
77-
g.Expect(ok).To(BeFalse())
70+
g.Expect(km.TryLock(key)).To(BeFalse())
7871
}
7972

8073
// Unlock all Clusters.
81-
for _, unlock := range unlocks {
82-
unlock()
74+
for _, key := range clusters {
75+
km.Unlock(key)
8376
}
8477
}
8578

0 commit comments

Comments
 (0)