Skip to content

Commit 1e4af0d

Browse files
authored
Merge pull request #2169 from chuckha/failure-domain
✨ implement failure domain picking
2 parents 403fbd8 + 205dd64 commit 1e4af0d

File tree

9 files changed

+380
-22
lines changed

9 files changed

+380
-22
lines changed

controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
kerrors "k8s.io/apimachinery/pkg/util/errors"
3434
"k8s.io/apiserver/pkg/storage/names"
3535
"k8s.io/client-go/tools/record"
36+
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
3637
ctrl "sigs.k8s.io/controller-runtime"
3738
"sigs.k8s.io/controller-runtime/pkg/client"
3839
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -226,6 +227,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
226227
// Once we start handling upgrade, we'll need to filter this list and act appropriately
227228
numMachines := len(ownedMachines.Items)
228229
desiredReplicas := int(*kcp.Spec.Replicas)
230+
229231
switch {
230232
// We are creating the first replica
231233
case numMachines < desiredReplicas && numMachines == 0:
@@ -423,7 +425,26 @@ func (r *KubeadmControlPlaneReconciler) generateKubeadmConfig(ctx context.Contex
423425
return bootstrapRef, nil
424426
}
425427

428+
func (r *KubeadmControlPlaneReconciler) failureDomainForScaleUp(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) (*string, error) {
429+
// Don't do anything if there are no failure domains defined on the cluster.
430+
if len(cluster.Status.FailureDomains) == 0 {
431+
return nil, nil
432+
}
433+
machineList, err := r.getMachines(ctx, types.NamespacedName{Namespace: cluster.GetNamespace(), Name: cluster.GetName()})
434+
if err != nil {
435+
return nil, err
436+
}
437+
machineList = r.filterOwnedMachines(kcp, machineList)
438+
picker := internal.FailureDomainPicker{Log: r.Log}
439+
failureDomain := picker.PickFewest(cluster.Status.FailureDomains.FilterControlPlane(), machineList.Items)
440+
return &failureDomain, nil
441+
}
442+
426443
func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster, infraRef, bootstrapRef *corev1.ObjectReference) error {
444+
fd, err := r.failureDomainForScaleUp(ctx, kcp, cluster)
445+
if err != nil {
446+
return err
447+
}
427448
machine := &clusterv1.Machine{
428449
ObjectMeta: metav1.ObjectMeta{
429450
Labels: generateKubeadmControlPlaneLabels(cluster.Name),
@@ -439,6 +460,7 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
439460
},
440461
},
441462
}
463+
machine.Spec.FailureDomain = fd
442464

443465
owner := metav1.NewControllerRef(kcp, controlplanev1.GroupVersion.WithKind("KubeadmControlPlane"))
444466
if owner != nil {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"sort"
21+
22+
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
23+
)
24+
25+
type failureDomainAggregation struct {
26+
id string
27+
count int
28+
}
29+
type failureDomainAggregations []failureDomainAggregation
30+
31+
// Len is the number of elements in the collection.
32+
func (f failureDomainAggregations) Len() int {
33+
return len(f)
34+
}
35+
36+
// Less reports whether the element with
37+
// index i should sort before the element with index j.
38+
func (f failureDomainAggregations) Less(i, j int) bool {
39+
return f[i].count < f[j].count
40+
}
41+
42+
// Swap swaps the elements with indexes i and j.
43+
func (f failureDomainAggregations) Swap(i, j int) {
44+
f[i], f[j] = f[j], f[i]
45+
}
46+
47+
// Scope down logr.Logger
48+
type logger interface {
49+
Info(msg string, keysAndValues ...interface{})
50+
}
51+
52+
// FailureDomainPicker picks a failure domain given a list of failure domains and a list of machines.
53+
type FailureDomainPicker struct {
54+
Log logger
55+
}
56+
57+
// PickMost returns the failure domain with the most number of machines.
58+
func (f *FailureDomainPicker) PickMost(failureDomains clusterv1.FailureDomains, machines []clusterv1.Machine) string {
59+
aggregations := f.pick(failureDomains, machines)
60+
if len(aggregations) == 0 {
61+
return ""
62+
}
63+
sort.Sort(sort.Reverse(aggregations))
64+
return aggregations[0].id
65+
66+
}
67+
68+
// PickFewest returns the failure domain with the fewest number of machines.
69+
func (f *FailureDomainPicker) PickFewest(failureDomains clusterv1.FailureDomains, machines []clusterv1.Machine) string {
70+
aggregations := f.pick(failureDomains, machines)
71+
if len(aggregations) == 0 {
72+
return ""
73+
}
74+
sort.Sort(aggregations)
75+
return aggregations[0].id
76+
}
77+
78+
func (f *FailureDomainPicker) pick(failureDomains clusterv1.FailureDomains, machines []clusterv1.Machine) failureDomainAggregations {
79+
if len(failureDomains) == 0 {
80+
return failureDomainAggregations{}
81+
}
82+
83+
counters := map[string]int{}
84+
85+
// Initialize the known failure domain keys to find out if an existing machine is in an unsupported failure domain.
86+
for fd := range failureDomains {
87+
counters[fd] = 0
88+
}
89+
90+
// Count how many machines are in each failure domain.
91+
for _, m := range machines {
92+
if m.Spec.FailureDomain == nil {
93+
continue
94+
}
95+
id := *m.Spec.FailureDomain
96+
if _, ok := failureDomains[id]; !ok {
97+
f.Log.Info("unknown failure domain", "machine-name", m.GetName(), "failure-domain-id", id, "known-failure-domains", failureDomains)
98+
continue
99+
}
100+
counters[id]++
101+
}
102+
103+
aggregations := make(failureDomainAggregations, 0)
104+
105+
// Gather up tuples of failure domains ids and counts
106+
for fd, count := range counters {
107+
aggregations = append(aggregations, failureDomainAggregation{id: fd, count: count})
108+
}
109+
110+
return aggregations
111+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/klog/klogr"
23+
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
24+
)
25+
26+
func TestNewFailureDomainPicker(t *testing.T) {
27+
a := "us-west-1a"
28+
b := "us-west-1b"
29+
30+
fds := clusterv1.FailureDomains{
31+
a: clusterv1.FailureDomainSpec{},
32+
b: clusterv1.FailureDomainSpec{},
33+
}
34+
machinea := clusterv1.Machine{Spec: clusterv1.MachineSpec{FailureDomain: &a}}
35+
machineb := clusterv1.Machine{Spec: clusterv1.MachineSpec{FailureDomain: &b}}
36+
machinenil := clusterv1.Machine{Spec: clusterv1.MachineSpec{FailureDomain: nil}}
37+
38+
testcases := []struct {
39+
name string
40+
logger logger
41+
fds clusterv1.FailureDomains
42+
machines []clusterv1.Machine
43+
expected []string
44+
}{
45+
{
46+
name: "simple",
47+
expected: []string{""},
48+
},
49+
{
50+
name: "no machines",
51+
fds: clusterv1.FailureDomains{
52+
a: clusterv1.FailureDomainSpec{},
53+
},
54+
expected: []string{a},
55+
},
56+
{
57+
name: "one machine in a failure domain",
58+
fds: fds,
59+
machines: []clusterv1.Machine{
60+
machinea,
61+
},
62+
expected: []string{b},
63+
},
64+
{
65+
name: "no failure domain specified on machine",
66+
fds: clusterv1.FailureDomains{
67+
a: clusterv1.FailureDomainSpec{},
68+
},
69+
machines: []clusterv1.Machine{
70+
machinenil,
71+
},
72+
expected: []string{a, b},
73+
},
74+
{
75+
name: "mismatched failure domain on machine",
76+
logger: klogr.New(),
77+
fds: clusterv1.FailureDomains{
78+
a: clusterv1.FailureDomainSpec{},
79+
},
80+
machines: []clusterv1.Machine{
81+
machineb,
82+
},
83+
expected: []string{a},
84+
},
85+
{
86+
name: "failure domains and no machines should return a valid failure domain",
87+
fds: fds,
88+
machines: []clusterv1.Machine{},
89+
expected: []string{a, b},
90+
},
91+
}
92+
for _, tc := range testcases {
93+
t.Run(tc.name, func(t *testing.T) {
94+
picker := FailureDomainPicker{Log: tc.logger}
95+
fd := picker.PickFewest(tc.fds, tc.machines)
96+
97+
found := false
98+
for _, expectation := range tc.expected {
99+
if fd == expectation {
100+
found = true
101+
}
102+
}
103+
if !found {
104+
t.Fatal("did not find expected value")
105+
}
106+
})
107+
}
108+
}

test/infrastructure/docker/api/v1alpha3/dockercluster_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package v1alpha3
1818

1919
import (
2020
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21+
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
2122
)
2223

2324
const (
@@ -34,12 +35,23 @@ type DockerClusterSpec struct {
3435
// ControlPlaneEndpoint represents the endpoint used to communicate with the control plane.
3536
// +optional
3637
ControlPlaneEndpoint APIEndpoint `json:"controlPlaneEndpoint"`
38+
39+
// FailureDomains are not usulaly defined on the spec.
40+
// The docker provider is special since failure domains don't mean anything in a local docker environment.
41+
// Instead, the docker cluster controller will simply copy these into the Status and allow the Cluster API
42+
// controllers to do what they will with the defined failure domains.
43+
// +optional
44+
FailureDomains clusterv1.FailureDomains `json:"failureDomains,omitempty"`
3745
}
3846

3947
// DockerClusterStatus defines the observed state of DockerCluster.
4048
type DockerClusterStatus struct {
4149
// Ready denotes that the docker cluster (infrastructure) is ready.
4250
Ready bool `json:"ready"`
51+
52+
// FailureDomains don't mean much in CAPD since it's all local, but we can see how the rest of cluster API
53+
// will use this if we populate it.
54+
FailureDomains clusterv1.FailureDomains `json:"failureDomains,omitempty"`
4355
}
4456

4557
// APIEndpoint represents a reachable Kubernetes API endpoint.

test/infrastructure/docker/api/v1alpha3/zz_generated.deepcopy.go

Lines changed: 17 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)