Skip to content

Commit 6b648d8

Browse files
author
Sedef
committed
Add etcd quorum check to KCP
1 parent 28c941a commit 6b648d8

File tree

7 files changed

+52
-4
lines changed

7 files changed

+52
-4
lines changed

controlplane/kubeadm/internal/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (m *Management) getApiServerEtcdClientCert(ctx context.Context, clusterKey
181181

182182
type healthCheck func(context.Context) (HealthCheckResult, error)
183183

184-
// HealthCheck will run a generic health check function and report any errors discovered.
184+
// healthCheck will run a generic health check function and report any errors discovered.
185185
// In addition to the health check, it also ensures there is a 1;1 match between nodes and machines.
186186
func (m *Management) healthCheck(ctx context.Context, check healthCheck, clusterKey client.ObjectKey) error {
187187
var errorList []error

controlplane/kubeadm/internal/etcd/etcd.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"github.com/pkg/errors"
2626
"go.etcd.io/etcd/clientv3"
27+
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
2728
"go.etcd.io/etcd/etcdserver/etcdserverpb"
2829
"google.golang.org/grpc"
2930
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
@@ -46,6 +47,7 @@ type etcd interface {
4647
MemberUpdate(ctx context.Context, id uint64, peerURLs []string) (*clientv3.MemberUpdateResponse, error)
4748
MoveLeader(ctx context.Context, id uint64) (*clientv3.MoveLeaderResponse, error)
4849
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
50+
Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
4951
}
5052

5153
// Client wraps an etcd client formatting its output to something more consumable.
@@ -234,3 +236,16 @@ func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) {
234236

235237
return memberAlarms, nil
236238
}
239+
240+
// IsEndpointHealthy checks the healthiness of endpoints specified in endpoints during Client creation.
241+
// Using the same logic used in etcdctl health command:
242+
// "get a random key. As long as we can get the response without an error, the endpoint is health."
243+
func (c *Client) IsEndpointHealthy(ctx context.Context) error {
244+
_, err := c.EtcdClient.Get(ctx, "health")
245+
246+
// permission denied is OK since it reports the endpoint is working, no matter of the grants on the specific key
247+
if err == nil || err == rpctypes.ErrPermissionDenied {
248+
return nil
249+
}
250+
return err
251+
}

controlplane/kubeadm/internal/etcd/etcd_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ func TestEtcdMembers_WithSuccess(t *testing.T) {
8585
MemberRemoveResponse: &clientv3.MemberRemoveResponse{},
8686
AlarmResponse: &clientv3.AlarmResponse{},
8787
StatusResponse: &clientv3.StatusResponse{},
88+
GetResponse: &clientv3.GetResponse{},
8889
}
8990

9091
client, err := newEtcdClient(ctx, fakeEtcdClient)

controlplane/kubeadm/internal/etcd/fake/client.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type FakeEtcdClient struct {
3030
MemberUpdateResponse *clientv3.MemberUpdateResponse
3131
MoveLeaderResponse *clientv3.MoveLeaderResponse
3232
StatusResponse *clientv3.StatusResponse
33+
GetResponse *clientv3.GetResponse
3334
ErrorResponse error
3435
MovedLeader uint64
3536
RemovedMember uint64
@@ -65,3 +66,7 @@ func (c *FakeEtcdClient) MemberUpdate(_ context.Context, _ uint64, _ []string) (
6566
func (c *FakeEtcdClient) Status(_ context.Context, _ string) (*clientv3.StatusResponse, error) {
6667
return c.StatusResponse, nil
6768
}
69+
70+
func (c *FakeEtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
71+
return c.GetResponse, nil
72+
}

controlplane/kubeadm/internal/etcd_client_generator.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import (
2121
"crypto/tls"
2222

2323
"github.com/pkg/errors"
24-
kerrors "k8s.io/apimachinery/pkg/util/errors"
25-
2624
corev1 "k8s.io/api/core/v1"
2725
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
kerrors "k8s.io/apimachinery/pkg/util/errors"
2827
"k8s.io/client-go/rest"
2928
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
3029
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"

controlplane/kubeadm/internal/workload_cluster_etcd.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
4949

5050
expectedMembers := 0
5151
response := make(map[string]error)
52+
var memberList []*etcd.Member
53+
nodesReportSameMemberList := true
5254
for _, node := range controlPlaneNodes.Items {
5355
name := node.Name
5456
response[name] = nil
@@ -85,14 +87,26 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
8587
}
8688
defer etcdClient.Close()
8789

90+
// Check healthiness of the etcd endpoint by trying to get a random key.
91+
if err := etcdClient.IsEndpointHealthy(ctx); err != nil {
92+
response[name] = errors.Wrap(err, "etcd member is unhealthy")
93+
continue
94+
}
95+
8896
// List etcd members. This checks that the member is healthy, because the request goes through consensus.
8997
members, err := etcdClient.Members(ctx)
9098
if err != nil {
9199
response[name] = errors.Wrap(err, "failed to list etcd members using etcd client")
92100
continue
93101
}
94102

103+
memberList = members
104+
95105
member := etcdutil.MemberForName(members, name)
106+
if member == nil {
107+
response[name] = errors.Wrap(err, "failed to find the member for the machine")
108+
continue
109+
}
96110

97111
// Check that the member reports no alarms.
98112
if len(member.Alarms) > 0 {
@@ -116,9 +130,10 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
116130
} else {
117131
unknownMembers := memberIDSet.Difference(knownMemberIDSet)
118132
if unknownMembers.Len() > 0 {
133+
nodesReportSameMemberList = false
119134
response[name] = errors.Errorf("etcd member reports members IDs %v, but all previously seen etcd members reported member IDs %v", memberIDSet.UnsortedList(), knownMemberIDSet.UnsortedList())
135+
continue
120136
}
121-
continue
122137
}
123138
}
124139

@@ -130,6 +145,17 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
130145
return response, errors.Errorf("there are %d healthy etcd pods, but %d etcd members", expectedMembers, len(knownMemberIDSet))
131146
}
132147

148+
// If the nodes are reporting a consistent member list make a further check ensuring that all the etcd members
149+
// have a corresponding node (so there should not be "external" etcd members).
150+
if nodesReportSameMemberList {
151+
for _, m := range memberList {
152+
// NOTE: we are using response for this check because it has a 1:1 correspondence with control plane nodes and map does support existence check
153+
if _, ok := response[m.Name]; !ok {
154+
return response, errors.Errorf("etcd member %q does not have a corresponding control plane nodes", m.Name)
155+
}
156+
}
157+
}
158+
133159
return response, nil
134160
}
135161

controlplane/kubeadm/internal/workload_cluster_etcd_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,12 @@ func TestWorkload_EtcdIsHealthy(t *testing.T) {
7070
AlarmResponse: &clientv3.AlarmResponse{
7171
Alarms: []*pb.AlarmMember{},
7272
},
73+
GetResponse: &clientv3.GetResponse{},
7374
},
7475
},
7576
},
7677
}
78+
ctx := context.Background()
7779
health, err := workload.EtcdIsHealthy(ctx)
7880
g.Expect(err).NotTo(HaveOccurred())
7981

0 commit comments

Comments
 (0)