Skip to content

Commit 4d89d56

Browse files
authored
Merge pull request #2295 from chuckha/hcs
✨ Adds healthcheck for workload clusters
2 parents 8acdffa + c6d3f31 commit 4d89d56

File tree

6 files changed

+945
-3
lines changed

6 files changed

+945
-3
lines changed
Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"context"
21+
"crypto/rand"
22+
"crypto/rsa"
23+
"crypto/tls"
24+
"crypto/x509"
25+
"crypto/x509/pkix"
26+
"fmt"
27+
"math/big"
28+
"time"
29+
30+
"github.com/pkg/errors"
31+
corev1 "k8s.io/api/core/v1"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/types"
34+
kerrors "k8s.io/apimachinery/pkg/util/errors"
35+
"k8s.io/client-go/kubernetes/scheme"
36+
"k8s.io/client-go/rest"
37+
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
38+
"sigs.k8s.io/cluster-api/controllers/remote"
39+
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
40+
etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
41+
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
42+
"sigs.k8s.io/cluster-api/util/certs"
43+
"sigs.k8s.io/cluster-api/util/secret"
44+
"sigs.k8s.io/controller-runtime/pkg/client"
45+
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
46+
)
47+
48+
// ManagementCluster holds operations on the ManagementCluster
49+
type ManagementCluster struct {
50+
Client ctrlclient.Client
51+
}
52+
53+
// OwnedControlPlaneMachines returns a MachineFilter function to find all owned control plane machines.
54+
// Usage: managementCluster.GetMachinesForCluster(ctx, cluster, OwnedControlPlaneMachines(controlPlane.Name))
55+
func OwnedControlPlaneMachines(controlPlaneName string) func(machine clusterv1.Machine) bool {
56+
return func(machine clusterv1.Machine) bool {
57+
controllerRef := metav1.GetControllerOf(&machine)
58+
if controllerRef == nil {
59+
return false
60+
}
61+
return controllerRef.Kind == "KubeadmControlPlane" && controllerRef.Name == controlPlaneName
62+
}
63+
}
64+
65+
// GetMachinesForCluster returns a list of machines that can be filtered or not.
66+
// If no filter is supplied then all machines associated with the target cluster are returned.
67+
func (m *ManagementCluster) GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...func(machine clusterv1.Machine) bool) ([]clusterv1.Machine, error) {
68+
selector := map[string]string{
69+
clusterv1.ClusterLabelName: cluster.Name,
70+
}
71+
allMachines := &clusterv1.MachineList{}
72+
if err := m.Client.List(ctx, allMachines, client.InNamespace(cluster.Namespace), client.MatchingLabels(selector)); err != nil {
73+
return nil, errors.Wrap(err, "failed to list machines")
74+
}
75+
if len(filters) == 0 {
76+
return allMachines.Items, nil
77+
}
78+
filteredMachines := []clusterv1.Machine{}
79+
for _, machine := range allMachines.Items {
80+
add := true
81+
for _, filter := range filters {
82+
if !filter(machine) {
83+
add = false
84+
break
85+
}
86+
}
87+
if add {
88+
filteredMachines = append(filteredMachines, machine)
89+
}
90+
}
91+
return filteredMachines, nil
92+
}
93+
94+
// getCluster builds a cluster object.
95+
// The cluster is also populated with secrets stored on the management cluster that is required for
96+
// secure internal pod connections.
97+
func (m *ManagementCluster) getCluster(ctx context.Context, clusterKey types.NamespacedName) (*cluster, error) {
98+
// This adapter is for interop with the `remote` package.
99+
adapterCluster := &clusterv1.Cluster{
100+
ObjectMeta: metav1.ObjectMeta{
101+
Namespace: clusterKey.Namespace,
102+
Name: clusterKey.Name,
103+
},
104+
}
105+
106+
// TODO(chuckha): Unroll remote.NewClusterClient if we are unhappy with getting a restConfig twice.
107+
restConfig, err := remote.RESTConfig(ctx, m.Client, adapterCluster)
108+
if err != nil {
109+
return nil, err
110+
}
111+
112+
c, err := remote.NewClusterClient(ctx, m.Client, adapterCluster, scheme.Scheme)
113+
if err != nil {
114+
return nil, err
115+
}
116+
etcdCACert, etcdCAKey, err := m.GetEtcdCerts(ctx, clusterKey)
117+
if err != nil {
118+
return nil, err
119+
}
120+
return &cluster{
121+
client: c,
122+
restConfig: restConfig,
123+
etcdCACert: etcdCACert,
124+
etcdCAkey: etcdCAKey,
125+
}, nil
126+
}
127+
128+
// GetEtcdCerts returns the EtcdCA Cert and Key for a given cluster.
129+
func (m *ManagementCluster) GetEtcdCerts(ctx context.Context, cluster types.NamespacedName) ([]byte, []byte, error) {
130+
etcdCASecret := &corev1.Secret{}
131+
etcdCAObjectKey := types.NamespacedName{
132+
Namespace: cluster.Namespace,
133+
Name: fmt.Sprintf("%s-etcd", cluster.Name),
134+
}
135+
if err := m.Client.Get(ctx, etcdCAObjectKey, etcdCASecret); err != nil {
136+
return nil, nil, errors.Wrapf(err, "failed to get secret; etcd CA bundle %s/%s", etcdCAObjectKey.Namespace, etcdCAObjectKey.Name)
137+
}
138+
etcdCACertData, ok := etcdCASecret.Data[secret.TLSCrtDataName]
139+
if !ok {
140+
return nil, nil, errors.New("empty ca certificate")
141+
}
142+
etcdCAKeyData, ok := etcdCASecret.Data[secret.TLSKeyDataName]
143+
if !ok {
144+
return nil, nil, errors.New("empty ca key")
145+
}
146+
return etcdCACertData, etcdCAKeyData, nil
147+
}
148+
149+
// TargetClusterEtcdIsHealthy runs a series of checks over a target cluster's etcd cluster.
150+
// In addition, it verifies that there are the same number of etcd members as control plane Machines.
151+
func (m *ManagementCluster) TargetClusterEtcdIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error {
152+
cluster, err := m.getCluster(ctx, clusterKey)
153+
if err != nil {
154+
return err
155+
}
156+
resp, err := cluster.etcdIsHealthy(ctx)
157+
if err != nil {
158+
return err
159+
}
160+
errorList := []error{}
161+
for nodeName, err := range resp {
162+
if err != nil {
163+
errorList = append(errorList, fmt.Errorf("node %q: %v", nodeName, err))
164+
}
165+
}
166+
if len(errorList) != 0 {
167+
return kerrors.NewAggregate(errorList)
168+
}
169+
170+
// Make sure Cluster API is aware of all the nodes.
171+
machines, err := m.GetMachinesForCluster(ctx, clusterKey, OwnedControlPlaneMachines(controlPlaneName))
172+
if err != nil {
173+
return err
174+
}
175+
176+
// This check ensures there is a 1 to 1 correspondence of nodes and machines.
177+
// Iterate through all machines ensuring that every control plane machine that Cluster API knows about has been
178+
// checked and exists in the response. If a machine was not checked then etcd is not considered healthy.
179+
for _, machine := range machines {
180+
if machine.Status.NodeRef == nil {
181+
return errors.Errorf("control plane machine %q has no node ref", machine.Name)
182+
}
183+
if _, ok := resp[machine.Status.NodeRef.Name]; !ok {
184+
return errors.Errorf("machine's (%q) node (%q) was not checked", machine.Name, machine.Status.NodeRef.Name)
185+
}
186+
}
187+
if len(resp) != len(machines) {
188+
return errors.Errorf("number of nodes and machines did not correspond: %d nodes %d machines", len(resp), len(machines))
189+
}
190+
return nil
191+
}
192+
193+
// ControlPlaneLabels returns a set of labels to add to a control plane machine for this specific cluster.
194+
func (m *ManagementCluster) ControlPlaneLabelsForCluster(clusterName string) map[string]string {
195+
return map[string]string{
196+
clusterv1.ClusterLabelName: clusterName,
197+
clusterv1.MachineControlPlaneLabelName: "",
198+
}
199+
}
200+
201+
// ControlPlaneSelectorForCluster returns the label selector necessary to get control plane machines for a given cluster.
202+
func (m *ManagementCluster) ControlPlaneSelectorForCluster(clusterName string) *metav1.LabelSelector {
203+
return &metav1.LabelSelector{
204+
MatchLabels: m.ControlPlaneLabelsForCluster(clusterName),
205+
}
206+
}
207+
208+
// cluster are operations on target clusters.
209+
type cluster struct {
210+
client ctrlclient.Client
211+
// restConfig is required for the proxy.
212+
restConfig *rest.Config
213+
etcdCACert, etcdCAkey []byte
214+
}
215+
216+
// generateEtcdTLSClientBundle builds an etcd client TLS bundle from the Etcd CA for this cluster.
217+
func (c *cluster) generateEtcdTLSClientBundle() (*tls.Config, error) {
218+
clientCert, err := generateClientCert(c.etcdCACert, c.etcdCAkey)
219+
if err != nil {
220+
return nil, err
221+
}
222+
223+
caPool := x509.NewCertPool()
224+
caPool.AppendCertsFromPEM(c.etcdCACert)
225+
226+
return &tls.Config{
227+
RootCAs: caPool,
228+
Certificates: []tls.Certificate{clientCert},
229+
}, nil
230+
}
231+
232+
// etcdIsHealthyResponse is a map of node provider IDs to the etcd health check that failed or nil if no checks failed.
233+
type etcdIsHealthyResponse map[string]error
234+
235+
// etcdIsHealthy runs checks for every etcd member in the cluster to satisfy our definition of healthy.
236+
// This is a best effort check and nodes can become unhealthy after the check is complete. It is not a guarantee.
237+
// It's used a signal for if we should allow a target cluster to scale up, scale down or upgrade.
238+
// It returns a list of nodes checked so the layer above can confirm it has the correct number of nodes expected.
239+
func (c *cluster) etcdIsHealthy(ctx context.Context) (etcdIsHealthyResponse, error) {
240+
var knownClusterID uint64
241+
var knownMemberIDSet etcdutil.UInt64Set
242+
243+
controlPlaneNodes := &corev1.NodeList{}
244+
controlPlaneNodeLabels := map[string]string{
245+
"node-role.kubernetes.io/master": "",
246+
}
247+
248+
if err := c.client.List(ctx, controlPlaneNodes, client.MatchingLabels(controlPlaneNodeLabels)); err != nil {
249+
return nil, err
250+
}
251+
252+
tlsConfig, err := c.generateEtcdTLSClientBundle()
253+
if err != nil {
254+
return nil, err
255+
}
256+
257+
response := map[string]error{}
258+
for _, node := range controlPlaneNodes.Items {
259+
name := node.Name
260+
response[name] = nil
261+
if node.Spec.ProviderID == "" {
262+
response[name] = errors.New("empty provider ID")
263+
continue
264+
}
265+
266+
// Create the etcd client for the etcd Pod scheduled on the Node
267+
etcdClient, err := c.getEtcdClientForNode(name, tlsConfig)
268+
if err != nil {
269+
response[name] = errors.Wrap(err, "failed to create etcd client")
270+
continue
271+
}
272+
273+
// List etcd members. This checks that the member is healthy, because the request goes through consensus.
274+
members, err := etcdClient.Members(ctx)
275+
if err != nil {
276+
response[name] = errors.Wrap(err, "failed to list etcd members using etcd client")
277+
continue
278+
}
279+
member := etcdutil.MemberForName(members, name)
280+
281+
// Check that the member reports no alarms.
282+
if len(member.Alarms) > 0 {
283+
response[name] = errors.Errorf("etcd member reports alarms: %v", member.Alarms)
284+
continue
285+
}
286+
287+
// Check that the member belongs to the same cluster as all other members.
288+
clusterID := member.ClusterID
289+
if knownClusterID == 0 {
290+
knownClusterID = clusterID
291+
} else if knownClusterID != clusterID {
292+
response[name] = errors.Errorf("etcd member has cluster ID %d, but all previously seen etcd members have cluster ID %d", clusterID, knownClusterID)
293+
continue
294+
}
295+
296+
// Check that the member list is stable.
297+
memberIDSet := etcdutil.MemberIDSet(members)
298+
if knownMemberIDSet.Len() == 0 {
299+
knownMemberIDSet = memberIDSet
300+
} else {
301+
unknownMembers := memberIDSet.Difference(knownMemberIDSet)
302+
if unknownMembers.Len() > 0 {
303+
response[name] = errors.Errorf("etcd member reports members IDs %v, but all previously seen etcd members reported member IDs %v", memberIDSet.UnsortedList(), knownMemberIDSet.UnsortedList())
304+
}
305+
continue
306+
}
307+
}
308+
309+
// Check that there is exactly one etcd member for every control plane machine.
310+
// There should be no etcd members added "out of band.""
311+
if len(controlPlaneNodes.Items) != len(knownMemberIDSet) {
312+
return response, errors.Errorf("there are %d control plane nodes, but %d etcd members", len(controlPlaneNodes.Items), len(knownMemberIDSet))
313+
}
314+
315+
return response, nil
316+
}
317+
318+
// getEtcdClientForNode returns a client that talks directly to an etcd instance living on a particular node.
319+
func (c *cluster) getEtcdClientForNode(nodeName string, tlsConfig *tls.Config) (*etcd.Client, error) {
320+
// This does not support external etcd.
321+
p := proxy.Proxy{
322+
Kind: "pods",
323+
Namespace: "kube-system", // TODO, can etcd ever run in a different namespace?
324+
ResourceName: etcdStaticPodName(nodeName),
325+
KubeConfig: c.restConfig,
326+
TLSConfig: tlsConfig,
327+
Port: 2379, // TODO: the pod doesn't expose a port. Is this a problem?
328+
}
329+
dialer, err := proxy.NewDialer(p)
330+
if err != nil {
331+
return nil, err
332+
}
333+
etcdclient, err := etcd.NewEtcdClient("127.0.0.1", dialer.DialContextWithAddr, tlsConfig)
334+
if err != nil {
335+
return nil, err
336+
}
337+
customClient, err := etcd.NewClientWithEtcd(etcdclient)
338+
if err != nil {
339+
return nil, err
340+
}
341+
return customClient, nil
342+
}
343+
344+
func generateClientCert(caCertEncoded, caKeyEncoded []byte) (tls.Certificate, error) {
345+
privKey, err := certs.NewPrivateKey()
346+
if err != nil {
347+
return tls.Certificate{}, err
348+
}
349+
caCert, err := certs.DecodeCertPEM(caCertEncoded)
350+
if err != nil {
351+
return tls.Certificate{}, err
352+
}
353+
caKey, err := certs.DecodePrivateKeyPEM(caKeyEncoded)
354+
if err != nil {
355+
return tls.Certificate{}, err
356+
}
357+
x509Cert, err := newClientCert(caCert, privKey, caKey)
358+
if err != nil {
359+
return tls.Certificate{}, err
360+
}
361+
return tls.X509KeyPair(certs.EncodeCertPEM(x509Cert), certs.EncodePrivateKeyPEM(privKey))
362+
}
363+
364+
func newClientCert(caCert *x509.Certificate, key *rsa.PrivateKey, caKey *rsa.PrivateKey) (*x509.Certificate, error) {
365+
cfg := certs.Config{
366+
CommonName: "cluster-api.x-k8s.io",
367+
}
368+
369+
now := time.Now().UTC()
370+
371+
tmpl := x509.Certificate{
372+
SerialNumber: new(big.Int).SetInt64(0),
373+
Subject: pkix.Name{
374+
CommonName: cfg.CommonName,
375+
Organization: cfg.Organization,
376+
},
377+
NotBefore: now.Add(time.Minute * -5),
378+
NotAfter: now.Add(time.Hour * 24 * 365 * 10), // 10 years
379+
KeyUsage: x509.KeyUsageDigitalSignature,
380+
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
381+
}
382+
383+
b, err := x509.CreateCertificate(rand.Reader, &tmpl, caCert, key.Public(), caKey)
384+
if err != nil {
385+
return nil, errors.Wrapf(err, "failed to create signed client certificate: %+v", tmpl)
386+
}
387+
388+
c, err := x509.ParseCertificate(b)
389+
return c, errors.WithStack(err)
390+
}
391+
392+
func etcdStaticPodName(nodeName string) string {
393+
return "etcd-" + nodeName
394+
}

0 commit comments

Comments
 (0)