Skip to content

🌱 Add etcd endpoint health check to KCP #3810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (m *Management) getApiServerEtcdClientCert(ctx context.Context, clusterKey

type healthCheck func(context.Context) (HealthCheckResult, error)

// HealthCheck will run a generic health check function and report any errors discovered.
// healthCheck will run a generic health check function and report any errors discovered.
// In addition to the health check, it also ensures there is a 1;1 match between nodes and machines.
func (m *Management) healthCheck(ctx context.Context, check healthCheck, clusterKey client.ObjectKey) error {
var errorList []error
Expand Down
15 changes: 15 additions & 0 deletions controlplane/kubeadm/internal/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pkg/errors"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"google.golang.org/grpc"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
Expand All @@ -46,6 +47,7 @@ type etcd interface {
MemberUpdate(ctx context.Context, id uint64, peerURLs []string) (*clientv3.MemberUpdateResponse, error)
MoveLeader(ctx context.Context, id uint64) (*clientv3.MoveLeaderResponse, error)
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
}

// Client wraps an etcd client formatting its output to something more consumable.
Expand Down Expand Up @@ -234,3 +236,16 @@ func (c *Client) Alarms(ctx context.Context) ([]MemberAlarm, error) {

return memberAlarms, nil
}

// IsEndpointHealthy checks the healthiness of endpoints specified in endpoints during Client creation.
// Using the same logic used in etcdctl health command:
// "get a random key. As long as we can get the response without an error, the endpoint is health."
func (c *Client) IsEndpointHealthy(ctx context.Context) error {
_, err := c.EtcdClient.Get(ctx, "health")

// permission denied is OK since it reports the endpoint is working, no matter of the grants on the specific key
if err == nil || err == rpctypes.ErrPermissionDenied {
return nil
}
return err
}
1 change: 1 addition & 0 deletions controlplane/kubeadm/internal/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func TestEtcdMembers_WithSuccess(t *testing.T) {
MemberRemoveResponse: &clientv3.MemberRemoveResponse{},
AlarmResponse: &clientv3.AlarmResponse{},
StatusResponse: &clientv3.StatusResponse{},
GetResponse: &clientv3.GetResponse{},
}

client, err := newEtcdClient(ctx, fakeEtcdClient)
Expand Down
5 changes: 5 additions & 0 deletions controlplane/kubeadm/internal/etcd/fake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type FakeEtcdClient struct {
MemberUpdateResponse *clientv3.MemberUpdateResponse
MoveLeaderResponse *clientv3.MoveLeaderResponse
StatusResponse *clientv3.StatusResponse
GetResponse *clientv3.GetResponse
ErrorResponse error
MovedLeader uint64
RemovedMember uint64
Expand Down Expand Up @@ -65,3 +66,7 @@ func (c *FakeEtcdClient) MemberUpdate(_ context.Context, _ uint64, _ []string) (
func (c *FakeEtcdClient) Status(_ context.Context, _ string) (*clientv3.StatusResponse, error) {
return c.StatusResponse, nil
}

func (c *FakeEtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
return c.GetResponse, nil
}
3 changes: 1 addition & 2 deletions controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import (
"crypto/tls"

"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
Expand Down
28 changes: 27 additions & 1 deletion controlplane/kubeadm/internal/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)

expectedMembers := 0
response := make(map[string]error)
var memberList []*etcd.Member
nodesReportSameMemberList := true
for _, node := range controlPlaneNodes.Items {
name := node.Name
response[name] = nil
Expand Down Expand Up @@ -85,14 +87,26 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
}
defer etcdClient.Close()

// Check healthiness of the etcd endpoint by trying to get a random key.
if err := etcdClient.IsEndpointHealthy(ctx); err != nil {
response[name] = errors.Wrap(err, "etcd member is unhealthy")
continue
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
response[name] = errors.Wrap(err, "failed to list etcd members using etcd client")
continue
}

memberList = members

member := etcdutil.MemberForName(members, name)
if member == nil {
response[name] = errors.Wrap(err, "failed to find the member for the machine")
continue
}

// Check that the member reports no alarms.
if len(member.Alarms) > 0 {
Expand All @@ -116,9 +130,10 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
} else {
unknownMembers := memberIDSet.Difference(knownMemberIDSet)
if unknownMembers.Len() > 0 {
nodesReportSameMemberList = false
response[name] = errors.Errorf("etcd member reports members IDs %v, but all previously seen etcd members reported member IDs %v", memberIDSet.UnsortedList(), knownMemberIDSet.UnsortedList())
continue
}
continue
}
}

Expand All @@ -130,6 +145,17 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
return response, errors.Errorf("there are %d healthy etcd pods, but %d etcd members", expectedMembers, len(knownMemberIDSet))
}

// If the nodes are reporting a consistent member list make a further check ensuring that all the etcd members
// have a corresponding node (so there should not be "external" etcd members).
if nodesReportSameMemberList {
for _, m := range memberList {
// NOTE: we are using response for this check because it has a 1:1 correspondence with control plane nodes and map does support existence check
if _, ok := response[m.Name]; !ok {
return response, errors.Errorf("etcd member %q does not have a corresponding control plane nodes", m.Name)
}
}
}

return response, nil
}

Expand Down
2 changes: 2 additions & 0 deletions controlplane/kubeadm/internal/workload_cluster_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ func TestWorkload_EtcdIsHealthy(t *testing.T) {
AlarmResponse: &clientv3.AlarmResponse{
Alarms: []*pb.AlarmMember{},
},
GetResponse: &clientv3.GetResponse{},
},
},
},
}
ctx := context.Background()
health, err := workload.EtcdIsHealthy(ctx)
g.Expect(err).NotTo(HaveOccurred())

Expand Down