Skip to content

Commit 8db27fc

Browse files
committed
Add watch label to controllers
1 parent 0d4e430 commit 8db27fc

12 files changed

+223
-8
lines changed

api/v1alpha4/common_types.go

+6
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ const (
5353
// on the reconciled object.
5454
PausedAnnotation = "cluster.x-k8s.io/paused"
5555

56+
// WatchLabel is a label othat can be applied to any Cluster API object.
57+
//
58+
// Controllers which allow for selective reconciliation may check this label and proceed
59+
// with reconciliation of the object only if this label and a configured value is present.
60+
WatchLabel = "cluster.x-k8s.io/watch"
61+
5662
// DeleteMachineAnnotation marks control plane and worker nodes that will be given priority for deletion
5763
// when KCP or a machineset scales down. This annotation is given top priority on all delete policies.
5864
DeleteMachineAnnotation = "cluster.x-k8s.io/delete-machine"

controllers/cluster_controller.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ const (
6464
// ClusterReconciler reconciles a Cluster object
6565
type ClusterReconciler struct {
6666
Client client.Client
67+
Config
6768

6869
restConfig *rest.Config
6970
recorder record.EventRecorder
@@ -78,7 +79,7 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
7879
handler.EnqueueRequestsFromMapFunc(r.controlPlaneMachineToCluster),
7980
).
8081
WithOptions(options).
81-
WithEventFilter(predicates.ResourceNotPaused(ctrl.LoggerFrom(ctx))).
82+
WithEventFilter(predicates.ResourceNotPausedAndHasLabel(ctrl.LoggerFrom(ctx), r.Config.WatchLabelValue)).
8283
Build(r)
8384

8485
if err != nil {

controllers/machine_controller.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ var (
7474

7575
// MachineReconciler reconciles a Machine object
7676
type MachineReconciler struct {
77-
Client client.Client
77+
Client client.Client
78+
Config
7879
Tracker *remote.ClusterCacheTracker
7980

8081
controller controller.Controller
@@ -92,7 +93,7 @@ func (r *MachineReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
9293
controller, err := ctrl.NewControllerManagedBy(mgr).
9394
For(&clusterv1.Machine{}).
9495
WithOptions(options).
95-
WithEventFilter(predicates.ResourceNotPaused(ctrl.LoggerFrom(ctx))).
96+
WithEventFilter(predicates.ResourceNotPausedAndHasLabel(ctrl.LoggerFrom(ctx), r.Config.WatchLabelValue)).
9697
Build(r)
9798
if err != nil {
9899
return errors.Wrap(err, "failed setting up with a controller manager")

controllers/machinedeployment_controller.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ var (
5454
// MachineDeploymentReconciler reconciles a MachineDeployment object
5555
type MachineDeploymentReconciler struct {
5656
Client client.Client
57+
Config
5758

5859
recorder record.EventRecorder
5960
restConfig *rest.Config
@@ -73,7 +74,7 @@ func (r *MachineDeploymentReconciler) SetupWithManager(ctx context.Context, mgr
7374
handler.EnqueueRequestsFromMapFunc(r.MachineSetToDeployments),
7475
).
7576
WithOptions(options).
76-
WithEventFilter(predicates.ResourceNotPaused(ctrl.LoggerFrom(ctx))).
77+
WithEventFilter(predicates.ResourceNotPausedAndHasLabel(ctrl.LoggerFrom(ctx), r.Config.WatchLabelValue)).
7778
Build(r)
7879
if err != nil {
7980
return errors.Wrap(err, "failed setting up with a controller manager")

controllers/machinehealthcheck_controller.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ const (
6565

6666
// MachineHealthCheckReconciler reconciles a MachineHealthCheck object
6767
type MachineHealthCheckReconciler struct {
68-
Client client.Client
68+
Client client.Client
69+
Config
6970
Tracker *remote.ClusterCacheTracker
7071

7172
controller controller.Controller
@@ -80,7 +81,7 @@ func (r *MachineHealthCheckReconciler) SetupWithManager(ctx context.Context, mgr
8081
handler.EnqueueRequestsFromMapFunc(r.machineToMachineHealthCheck),
8182
).
8283
WithOptions(options).
83-
WithEventFilter(predicates.ResourceNotPaused(ctrl.LoggerFrom(ctx))).
84+
WithEventFilter(predicates.ResourceNotPausedAndHasLabel(ctrl.LoggerFrom(ctx), r.Config.WatchLabelValue)).
8485
Build(r)
8586
if err != nil {
8687
return errors.Wrap(err, "failed setting up with a controller manager")

controllers/machineset_controller.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ var (
6767

6868
// MachineSetReconciler reconciles a MachineSet object
6969
type MachineSetReconciler struct {
70-
Client client.Client
70+
Client client.Client
71+
Config
7172
Tracker *remote.ClusterCacheTracker
7273

7374
recorder record.EventRecorder
@@ -88,7 +89,7 @@ func (r *MachineSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Ma
8889
handler.EnqueueRequestsFromMapFunc(r.MachineToMachineSets),
8990
).
9091
WithOptions(options).
91-
WithEventFilter(predicates.ResourceNotPaused(ctrl.LoggerFrom(ctx))).
92+
WithEventFilter(predicates.ResourceNotPausedAndHasLabel(ctrl.LoggerFrom(ctx), r.Config.WatchLabelValue)).
9293
Build(r)
9394
if err != nil {
9495
return errors.Wrap(err, "failed setting up with a controller manager")

controllers/types.go

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controllers
18+
19+
// Config is a type to pass configuration from flags to controllers.
20+
type Config struct {
21+
// WatchLabelValue defines the label value a controller watches on.
22+
WatchLabelValue string
23+
}

main.go

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818
import (
1919
"context"
2020
"flag"
21+
"fmt"
2122
"math/rand"
2223
"net/http"
2324
_ "net/http/pprof"
@@ -58,6 +59,7 @@ var (
5859
leaderElectionRenewDeadline time.Duration
5960
leaderElectionRetryPeriod time.Duration
6061
watchNamespace string
62+
watchLabelValue string
6163
profilerAddress string
6264
clusterConcurrency int
6365
machineConcurrency int
@@ -102,6 +104,9 @@ func InitFlags(fs *pflag.FlagSet) {
102104
fs.StringVar(&watchNamespace, "namespace", "",
103105
"Namespace that the controller watches to reconcile cluster-api objects. If unspecified, the controller watches for cluster-api objects across all namespaces.")
104106

107+
fs.StringVar(&watchLabelValue, "watch-label", "",
108+
fmt.Sprintf("Label value that the controller watches to reconcile cluster-api objects. Label key is always %s. If unspecified, the controller watches for all cluster-api objects.", clusterv1.WatchLabel))
109+
105110
fs.StringVar(&profilerAddress, "profiler-address", "",
106111
"Bind address to expose the pprof profiler (e.g. localhost:6060)")
107112

test/e2e/mhc_remediations.go

+18
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,28 @@ func MachineRemediationSpec(ctx context.Context, inputGetter func() MachineRemed
9696
WaitForMachineRemediation: input.E2EConfig.GetIntervals(specName, "wait-machine-remediation"),
9797
})
9898

99+
<<<<<<< HEAD
100+
=======
101+
By("Setting a machine unhealthy and wait for MachineDeployment remediation")
102+
framework.DiscoverMachineHealthChecksAndWaitForRemediation(ctx, framework.DiscoverMachineHealthCheckAndWaitForRemediationInput{
103+
ClusterProxy: input.BootstrapClusterProxy,
104+
Cluster: clusterResources.Cluster,
105+
WaitForMachineRemediation: input.E2EConfig.GetIntervals(specName, "wait-machine-remediation"),
106+
})
107+
108+
>>>>>>> Add watch label to controllers
99109
By("PASSED!")
100110
})
101111

102112
It("Should successfully trigger KCP remediation", func() {
103113

104114
By("Creating a workload cluster")
105115

116+
<<<<<<< HEAD
106117
clusterctl.ApplyClusterTemplateAndWait(ctx, clusterctl.ApplyClusterTemplateAndWaitInput{
118+
=======
119+
clusterResources = clusterctl.ApplyClusterTemplateAndWait(ctx, clusterctl.ApplyClusterTemplateAndWaitInput{
120+
>>>>>>> Add watch label to controllers
107121
ClusterProxy: input.BootstrapClusterProxy,
108122
ConfigCluster: clusterctl.ConfigClusterInput{
109123
LogFolder: filepath.Join(input.ArtifactFolder, "clusters", input.BootstrapClusterProxy.GetName()),
@@ -120,7 +134,11 @@ func MachineRemediationSpec(ctx context.Context, inputGetter func() MachineRemed
120134
WaitForClusterIntervals: input.E2EConfig.GetIntervals(specName, "wait-cluster"),
121135
WaitForControlPlaneIntervals: input.E2EConfig.GetIntervals(specName, "wait-control-plane"),
122136
WaitForMachineDeployments: input.E2EConfig.GetIntervals(specName, "wait-worker-nodes"),
137+
<<<<<<< HEAD
123138
}, clusterResources)
139+
=======
140+
})
141+
>>>>>>> Add watch label to controllers
124142

125143
By("Setting a machine unhealthy and wait for KubeadmControlPlane remediation")
126144
framework.DiscoverMachineHealthChecksAndWaitForRemediation(ctx, framework.DiscoverMachineHealthCheckAndWaitForRemediationInput{

util/labels/helpers.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package labels
18+
19+
import (
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
22+
)
23+
24+
// HasWatchLabel returns true if the object has a label with the WatchLabel key matching the given value.
25+
func HasWatchLabel(o metav1.Object, labelValue string) bool {
26+
val, ok := o.GetLabels()[clusterv1.WatchLabel]
27+
if !ok {
28+
return false
29+
}
30+
return val == labelValue
31+
}

util/labels/helpers_test.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package labels
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/gomega"
23+
corev1 "k8s.io/api/core/v1"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
26+
)
27+
28+
func TestHasWatchLabel(t *testing.T) {
29+
g := NewWithT(t)
30+
31+
var testcases = []struct {
32+
name string
33+
obj metav1.Object
34+
input string
35+
expected bool
36+
}{
37+
{
38+
name: "should handle empty input",
39+
obj: &corev1.Node{
40+
ObjectMeta: metav1.ObjectMeta{
41+
Labels: map[string]string{
42+
"foo": "bar",
43+
},
44+
},
45+
Spec: corev1.NodeSpec{},
46+
Status: corev1.NodeStatus{},
47+
},
48+
input: "",
49+
expected: false,
50+
},
51+
{
52+
name: "should return false if no input string is give",
53+
obj: &corev1.Node{
54+
ObjectMeta: metav1.ObjectMeta{
55+
Labels: map[string]string{
56+
clusterv1.WatchLabel: "bar",
57+
},
58+
},
59+
},
60+
input: "",
61+
expected: false,
62+
},
63+
{
64+
name: "should return true if label matches",
65+
obj: &corev1.Node{
66+
ObjectMeta: metav1.ObjectMeta{
67+
Labels: map[string]string{
68+
clusterv1.WatchLabel: "bar",
69+
},
70+
},
71+
Spec: corev1.NodeSpec{},
72+
Status: corev1.NodeStatus{},
73+
},
74+
input: "bar",
75+
expected: true,
76+
},
77+
}
78+
79+
for _, tc := range testcases {
80+
t.Run(tc.name, func(t *testing.T) {
81+
res := HasWatchLabel(tc.obj, tc.input)
82+
g.Expect(res).To(Equal(tc.expected))
83+
})
84+
}
85+
}

util/predicates/generic_predicates.go

+42
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/go-logr/logr"
2323
"sigs.k8s.io/cluster-api/util/annotations"
24+
"sigs.k8s.io/cluster-api/util/labels"
2425

2526
"sigs.k8s.io/controller-runtime/pkg/client"
2627
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -127,6 +128,25 @@ func Any(logger logr.Logger, predicates ...predicate.Funcs) predicate.Funcs {
127128
}
128129
}
129130

131+
// ResourceHasLabel returns a predicate that returns true only if the provided resource contains
132+
// a label with the WatchLabel key and the configured label value exactly.
133+
func ResourceHasLabel(logger logr.Logger, labelValue string) predicate.Funcs {
134+
return predicate.Funcs{
135+
UpdateFunc: func(e event.UpdateEvent) bool {
136+
return processIfLabelMatch(logger.WithValues("predicate", "updateEvent"), e.ObjectNew, labelValue)
137+
},
138+
CreateFunc: func(e event.CreateEvent) bool {
139+
return processIfLabelMatch(logger.WithValues("predicate", "createEvent"), e.Object, labelValue)
140+
},
141+
DeleteFunc: func(e event.DeleteEvent) bool {
142+
return processIfLabelMatch(logger.WithValues("predicate", "deleteEvent"), e.Object, labelValue)
143+
},
144+
GenericFunc: func(e event.GenericEvent) bool {
145+
return processIfLabelMatch(logger.WithValues("predicate", "genericEvent"), e.Object, labelValue)
146+
},
147+
}
148+
}
149+
130150
// ResourceNotPaused returns a Predicate that returns true only if the provided resource does not contain the
131151
// paused annotation.
132152
// This implements a common requirement for all cluster-api and provider controllers skip reconciliation when the paused
@@ -157,6 +177,12 @@ func ResourceNotPaused(logger logr.Logger) predicate.Funcs {
157177
}
158178
}
159179

180+
// ResourceNotPausedAndHasLabel returns a predicate that returns true only if the
181+
// ResourceNotPaused and ResourceHasLabel predicates return true.
182+
func ResourceNotPausedAndHasLabel(logger logr.Logger, labelValue string) predicate.Funcs {
183+
return All(logger, ResourceNotPaused(logger), ResourceHasLabel(logger, labelValue))
184+
}
185+
160186
func processIfNotPaused(logger logr.Logger, obj client.Object) bool {
161187
kind := strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind)
162188
log := logger.WithValues("namespace", obj.GetNamespace(), kind, obj.GetName())
@@ -167,3 +193,19 @@ func processIfNotPaused(logger logr.Logger, obj client.Object) bool {
167193
log.V(4).Info("Resource is not paused, will attempt to map resource")
168194
return true
169195
}
196+
197+
func processIfLabelMatch(logger logr.Logger, obj client.Object, labelValue string) bool {
198+
// Return early if no labelValue was set.
199+
if labelValue == "" {
200+
return true
201+
}
202+
203+
kind := strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind)
204+
log := logger.WithValues("namespace", obj.GetNamespace(), kind, obj.GetName())
205+
if labels.HasWatchLabel(obj, labelValue) {
206+
log.V(4).Info("Resource matches label, will attempt to map resource")
207+
return true
208+
}
209+
log.V(4).Info("Resource does not match label, will not attempt to map resource")
210+
return false
211+
}

0 commit comments

Comments
 (0)