Skip to content

Commit 2708f8e

Browse files
author
gab-satchi
committed
Adds forNodes that will use all available etcd endpoints
- exclude node being removed from nodelist for etcd client
1 parent 99cb87a commit 2708f8e

File tree

6 files changed

+48
-41
lines changed

6 files changed

+48
-41
lines changed

controlplane/kubeadm/internal/cluster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
116116
RootCAs: caPool,
117117
Certificates: []tls.Certificate{clientCert},
118118
}
119-
119+
cfg.InsecureSkipVerify = true
120120
return &Workload{
121121
Client: c,
122122
CoreDNSMigrator: &CoreDNSMigrator{},

controlplane/kubeadm/internal/etcd/etcd.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ func pbMemberToMember(m *etcdserverpb.Member) *Member {
112112
}
113113

114114
// NewEtcdClient creates a new etcd client with a custom dialer and is configuration with optional functions.
115-
func NewEtcdClient(endpoint string, dialer GRPCDial, tlsConfig *tls.Config) (*clientv3.Client, error) {
115+
func NewEtcdClient(endpoints []string, dialer GRPCDial, tlsConfig *tls.Config) (*clientv3.Client, error) {
116116
etcdClient, err := clientv3.New(clientv3.Config{
117-
Endpoints: []string{endpoint},
117+
Endpoints: endpoints,
118118
DialTimeout: etcdTimeout,
119119
DialOptions: []grpc.DialOption{
120120
grpc.WithBlock(), // block until the underlying connection is up
@@ -125,7 +125,6 @@ func NewEtcdClient(endpoint string, dialer GRPCDial, tlsConfig *tls.Config) (*cl
125125
if err != nil {
126126
return nil, errors.Wrap(err, "unable to create etcd client")
127127
}
128-
etcdClient.Endpoints()
129128
return etcdClient, nil
130129
}
131130

controlplane/kubeadm/internal/etcd_client_generator.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ package internal
1919
import (
2020
"context"
2121
"crypto/tls"
22-
2322
"github.com/pkg/errors"
23+
kerrors "k8s.io/apimachinery/pkg/util/errors"
24+
2425
corev1 "k8s.io/api/core/v1"
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26-
kerrors "k8s.io/apimachinery/pkg/util/errors"
2727
"k8s.io/client-go/rest"
2828
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
2929
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
@@ -35,21 +35,24 @@ type etcdClientGenerator struct {
3535
tlsConfig *tls.Config
3636
}
3737

38-
func (c *etcdClientGenerator) forNode(ctx context.Context, name string) (*etcd.Client, error) {
39-
// This does not support external etcd.
38+
func (c *etcdClientGenerator) forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
39+
endpoints := make([]string, len(nodes))
40+
for i, node := range nodes {
41+
endpoints[i] = staticPodName("etcd", node.Name)
42+
}
43+
4044
p := proxy.Proxy{
41-
Kind: "pods",
42-
Namespace: metav1.NamespaceSystem, // TODO, can etcd ever run in a different namespace?
43-
ResourceName: staticPodName("etcd", name),
44-
KubeConfig: c.restConfig,
45-
TLSConfig: c.tlsConfig,
46-
Port: 2379, // TODO: the pod doesn't expose a port. Is this a problem?
45+
Kind: "pods",
46+
Namespace: metav1.NamespaceSystem,
47+
KubeConfig: c.restConfig,
48+
TLSConfig: c.tlsConfig,
49+
Port: 2379,
4750
}
4851
dialer, err := proxy.NewDialer(p)
4952
if err != nil {
5053
return nil, err
5154
}
52-
etcdclient, err := etcd.NewEtcdClient("127.0.0.1", dialer.DialContextWithAddr, c.tlsConfig)
55+
etcdclient, err := etcd.NewEtcdClient(endpoints, dialer.DialContextWithAddr, c.tlsConfig)
5356
if err != nil {
5457
return nil, err
5558
}
@@ -61,11 +64,11 @@ func (c *etcdClientGenerator) forNode(ctx context.Context, name string) (*etcd.C
6164
}
6265

6366
// forLeader takes a list of nodes and returns a client to the leader node
64-
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes *corev1.NodeList) (*etcd.Client, error) {
67+
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
6568
var errs []error
6669

67-
for _, node := range nodes.Items {
68-
client, err := c.forNode(ctx, node.Name)
70+
for _, node := range nodes {
71+
client, err := c.forNodes(ctx, []corev1.Node{node})
6972
if err != nil {
7073
errs = append(errs, err)
7174
continue
@@ -77,8 +80,8 @@ func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes *corev1.NodeL
7780
continue
7881
}
7982
for _, member := range members {
80-
if member.ID == client.LeaderID {
81-
return c.forNode(ctx, member.Name)
83+
if member.Name == node.Name && member.ID == client.LeaderID {
84+
return c.forNodes(ctx, []corev1.Node{node})
8285
}
8386
}
8487
}

controlplane/kubeadm/internal/proxy/dial.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (d *Dialer) DialContext(_ context.Context, network string, addr string) (ne
8888
Post().
8989
Resource(d.proxy.Kind).
9090
Namespace(d.proxy.Namespace).
91-
Name(d.proxy.ResourceName).
91+
Name(addr).
9292
SubResource("portforward")
9393

9494
dialer := spdy.NewDialer(d.upgrader, &http.Client{Transport: d.proxyTransport}, "POST", req.URL())

controlplane/kubeadm/internal/workload_cluster_etcd.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ import (
3030
)
3131

3232
type etcdClientFor interface {
33-
forNode(ctx context.Context, name string) (*etcd.Client, error)
34-
forLeader(ctx context.Context, nodes *corev1.NodeList) (*etcd.Client, error)
33+
forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
34+
forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
3535
}
3636

3737
// EtcdIsHealthy runs checks for every etcd member in the cluster to satisfy our definition of healthy.
@@ -78,7 +78,7 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
7878
expectedMembers++
7979

8080
// Create the etcd Client for the etcd Pod scheduled on the Node
81-
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
81+
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
8282
if err != nil {
8383
response[name] = errors.Wrap(err, "failed to create etcd client")
8484
continue
@@ -142,10 +142,8 @@ func (w *Workload) ReconcileEtcdMembers(ctx context.Context) error {
142142

143143
errs := []error{}
144144
for _, node := range controlPlaneNodes.Items {
145-
name := node.Name
146-
147145
// Create the etcd Client for the etcd Pod scheduled on the Node
148-
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
146+
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
149147
if err != nil {
150148
continue
151149
}
@@ -209,15 +207,22 @@ func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clus
209207
}
210208

211209
func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
212-
// Pick a different node to talk to etcd
213210
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
214211
if err != nil {
215212
return err
216213
}
217214
if len(controlPlaneNodes.Items) < 2 {
218215
return ErrControlPlaneMinNodes
219216
}
220-
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, controlPlaneNodes)
217+
218+
// Exclude node being removed from etcd client node list
219+
var remainingNodes []corev1.Node
220+
for _, n := range controlPlaneNodes.Items {
221+
if n.Name != name {
222+
remainingNodes = append(remainingNodes, n)
223+
}
224+
}
225+
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, remainingNodes)
221226
if err != nil {
222227
return errors.Wrap(err, "failed to create etcd client")
223228
}
@@ -258,7 +263,7 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1
258263
return errors.Wrap(err, "failed to list control plane nodes")
259264
}
260265

261-
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes)
266+
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes.Items)
262267
if err != nil {
263268
return errors.Wrap(err, "failed to create etcd client")
264269
}

controlplane/kubeadm/internal/workload_cluster_etcd_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestWorkload_EtcdIsHealthy(t *testing.T) {
5757
},
5858
},
5959
etcdClientGenerator: &fakeEtcdClientGenerator{
60-
forNodeClient: &etcd.Client{
60+
forNodesClient: &etcd.Client{
6161
EtcdClient: &fake2.FakeEtcdClient{
6262
EtcdEndpoints: []string{},
6363
MemberListResponse: &clientv3.MemberListResponse{
@@ -228,15 +228,15 @@ func TestRemoveEtcdMemberForMachine(t *testing.T) {
228228
name: "returns an error if it fails to create the etcd client",
229229
machine: machine,
230230
objs: []runtime.Object{cp1, cp2},
231-
etcdClientGenerator: &fakeEtcdClientGenerator{forLeaderErr: errors.New("no client")},
231+
etcdClientGenerator: &fakeEtcdClientGenerator{forNodesErr: errors.New("no client")},
232232
expectErr: true,
233233
},
234234
{
235235
name: "returns an error if the client errors getting etcd members",
236236
machine: machine,
237237
objs: []runtime.Object{cp1, cp2},
238238
etcdClientGenerator: &fakeEtcdClientGenerator{
239-
forLeaderClient: &etcd.Client{
239+
forNodesClient: &etcd.Client{
240240
EtcdClient: &fake2.FakeEtcdClient{
241241
ErrorResponse: errors.New("cannot get etcd members"),
242242
},
@@ -249,7 +249,7 @@ func TestRemoveEtcdMemberForMachine(t *testing.T) {
249249
machine: machine,
250250
objs: []runtime.Object{cp1, cp2},
251251
etcdClientGenerator: &fakeEtcdClientGenerator{
252-
forLeaderClient: &etcd.Client{
252+
forNodesClient: &etcd.Client{
253253
EtcdClient: &fake2.FakeEtcdClient{
254254
ErrorResponse: errors.New("cannot remove etcd member"),
255255
MemberListResponse: &clientv3.MemberListResponse{
@@ -272,7 +272,7 @@ func TestRemoveEtcdMemberForMachine(t *testing.T) {
272272
machine: machine,
273273
objs: []runtime.Object{cp1, cp2},
274274
etcdClientGenerator: &fakeEtcdClientGenerator{
275-
forLeaderClient: &etcd.Client{
275+
forNodesClient: &etcd.Client{
276276
EtcdClient: &fake2.FakeEtcdClient{
277277
MemberListResponse: &clientv3.MemberListResponse{
278278
Members: []*pb.Member{
@@ -368,7 +368,7 @@ func TestForwardEtcdLeadership(t *testing.T) {
368368
leaderCandidate: defaultMachine(),
369369
k8sClient: &fakeClient{},
370370
etcdClientGenerator: &fakeEtcdClientGenerator{
371-
forNodeClient: &etcd.Client{
371+
forLeaderClient: &etcd.Client{
372372
EtcdClient: &fake2.FakeEtcdClient{
373373
ErrorResponse: errors.New("cannot get etcd members"),
374374
},
@@ -507,17 +507,17 @@ func TestForwardEtcdLeadership(t *testing.T) {
507507
}
508508

509509
type fakeEtcdClientGenerator struct {
510-
forNodeClient *etcd.Client
510+
forNodesClient *etcd.Client
511511
forLeaderClient *etcd.Client
512-
forNodeErr error
512+
forNodesErr error
513513
forLeaderErr error
514514
}
515515

516-
func (c *fakeEtcdClientGenerator) forNode(_ context.Context, _ string) (*etcd.Client, error) {
517-
return c.forNodeClient, c.forNodeErr
516+
func (c *fakeEtcdClientGenerator) forNodes(_ context.Context, _ []corev1.Node) (*etcd.Client, error) {
517+
return c.forNodesClient, c.forNodesErr
518518
}
519519

520-
func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ *corev1.NodeList) (*etcd.Client, error) {
520+
func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ []corev1.Node) (*etcd.Client, error) {
521521
return c.forLeaderClient, c.forLeaderErr
522522
}
523523

0 commit comments

Comments
 (0)