Skip to content

Commit 0d5177f

Browse files
authored
Ensure we are not blocking if the operator is not able to get the Pod client (FoundationDB#1532)
1 parent e0b1434 commit 0d5177f

File tree

3 files changed

+144
-62
lines changed

3 files changed

+144
-62
lines changed

controllers/suite_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func reconcileObject(reconciler reconcile.Reconciler, metadata metav1.ObjectMeta
155155
log.Info("Reconciliation successful")
156156
}
157157
}
158+
158159
return result, err
159160
}
160161

controllers/update_pods.go

Lines changed: 84 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*
44
* This source file is part of the FoundationDB open source project
55
*
6-
* Copyright 2019-2021 Apple Inc. and the FoundationDB project authors
6+
* Copyright 2019-2023 Apple Inc. and the FoundationDB project authors
77
*
88
* Licensed under the Apache License, Version 2.0 (the "License");
99
* you may not use this file except in compliance with the License.
@@ -44,11 +44,48 @@ func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconcile
4444

4545
pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...)
4646
if err != nil {
47-
return &requeue{curError: err}
47+
return &requeue{curError: err, delayedRequeue: true}
48+
}
49+
50+
updates, err := getPodsToUpdate(logger, r, cluster, internal.CreatePodMap(cluster, pods))
51+
if err != nil {
52+
return &requeue{curError: err, delay: podSchedulingDelayDuration, delayedRequeue: true}
53+
}
54+
55+
if len(updates) > 0 {
56+
if cluster.Spec.AutomationOptions.PodUpdateStrategy == fdbv1beta2.PodUpdateStrategyReplacement {
57+
logger.Info("Requeuing reconciliation to replace pods")
58+
return &requeue{message: "Requeueing reconciliation to replace pods"}
59+
}
60+
61+
if r.PodLifecycleManager.GetDeletionMode(cluster) == fdbv1beta2.PodUpdateModeNone {
62+
r.Recorder.Event(cluster, corev1.EventTypeNormal,
63+
"NeedsPodsDeletion", "Spec require deleting some pods, but deleting pods is disabled")
64+
cluster.Status.Generations.NeedsPodDeletion = cluster.ObjectMeta.Generation
65+
err = r.updateOrApply(ctx, cluster)
66+
if err != nil {
67+
logger.Error(err, "Error updating cluster status")
68+
}
69+
return &requeue{message: "Pod deletion is disabled"}
70+
}
4871
}
4972

73+
if len(updates) == 0 {
74+
return nil
75+
}
76+
77+
adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client)
78+
if err != nil {
79+
return &requeue{curError: err, delayedRequeue: true}
80+
}
81+
defer adminClient.Close()
82+
83+
return deletePodsForUpdates(ctx, r, cluster, adminClient, updates, logger)
84+
}
85+
86+
// getPodsToUpdate returns a map of Zone to Pods mapping. The map has the fault domain as key and all Pods in that fault domain will be present as a slice of *corev1.Pod.
87+
func getPodsToUpdate(logger logr.Logger, reconciler *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster, podMap map[fdbv1beta2.ProcessGroupID]*corev1.Pod) (map[string][]*corev1.Pod, error) {
5088
updates := make(map[string][]*corev1.Pod)
51-
podMap := internal.CreatePodMap(cluster, pods)
5289

5390
for _, processGroup := range cluster.Status.ProcessGroups {
5491
if processGroup.IsMarkedForRemoval() {
@@ -74,90 +111,78 @@ func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconcile
74111
logger.V(1).Info("Could not find Pod for process group ID",
75112
"processGroupID", processGroup.ProcessGroupID)
76113
continue
77-
// TODO should not be continue but rather be a requeue?
78114
}
79115

80116
if shouldRequeueDueToTerminatingPod(pod, cluster, processGroup.ProcessGroupID) {
81-
return &requeue{message: "Cluster has pod that is pending deletion", delay: podSchedulingDelayDuration, delayedRequeue: true}
117+
return nil, fmt.Errorf("cluster has Pod %s that is pending deletion", pod.Name)
82118
}
83119

84120
_, idNum, err := podmanager.ParseProcessGroupID(processGroup.ProcessGroupID)
85121
if err != nil {
86-
return &requeue{curError: err}
122+
logger.Info("Skipping Pod due to error parsing Process Group ID",
123+
"processGroupID", processGroup.ProcessGroupID,
124+
"error", err.Error())
125+
continue
87126
}
88127

89128
processClass, err := podmanager.GetProcessClass(cluster, pod)
90129
if err != nil {
91-
return &requeue{curError: err}
130+
logger.Info("Skipping Pod due to error fetching process class",
131+
"processGroupID", processGroup.ProcessGroupID,
132+
"error", err.Error())
133+
continue
92134
}
93135

94136
specHash, err := internal.GetPodSpecHash(cluster, processClass, idNum, nil)
95137
if err != nil {
96-
return &requeue{curError: err}
97-
}
98-
99-
if pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey] != specHash {
100-
logger.Info("Update Pod",
138+
logger.Info("Skipping Pod due to error generating spec hash",
101139
"processGroupID", processGroup.ProcessGroupID,
102-
"reason", fmt.Sprintf("specHash has changed from %s to %s", specHash, pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey]))
103-
104-
podClient, message := r.getPodClient(cluster, pod)
105-
if podClient == nil {
106-
return &requeue{message: message, delay: podSchedulingDelayDuration}
107-
}
108-
109-
substitutions, err := podClient.GetVariableSubstitutions()
110-
if err != nil {
111-
return &requeue{curError: err}
112-
}
140+
"error", err.Error())
141+
continue
142+
}
113143

114-
if substitutions == nil {
115-
logger.Info("Skipping pod due to missing locality information",
116-
"processGroupID", processGroup.ProcessGroupID)
117-
continue
118-
}
144+
// The Pod is updated, so we can continue.
145+
if pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey] == specHash {
146+
continue
147+
}
119148

120-
zone := substitutions["FDB_ZONE_ID"]
121-
if r.InSimulation {
122-
zone = "simulation"
123-
}
149+
logger.Info("Update Pod",
150+
"processGroupID", processGroup.ProcessGroupID,
151+
"reason", fmt.Sprintf("specHash has changed from %s to %s", specHash, pod.ObjectMeta.Annotations[fdbv1beta2.LastSpecKey]))
124152

125-
if updates[zone] == nil {
126-
updates[zone] = make([]*corev1.Pod, 0)
127-
}
128-
updates[zone] = append(updates[zone], pod)
153+
podClient, message := reconciler.getPodClient(cluster, pod)
154+
if podClient == nil {
155+
logger.Info("Skipping Pod due to missing Pod client information",
156+
"processGroupID", processGroup.ProcessGroupID,
157+
"message", message)
158+
continue
129159
}
130-
}
131160

132-
if len(updates) > 0 {
133-
if cluster.Spec.AutomationOptions.PodUpdateStrategy == fdbv1beta2.PodUpdateStrategyReplacement {
134-
logger.Info("Requeuing reconciliation to replace pods")
135-
return &requeue{message: "Requeueing reconciliation to replace pods"}
161+
substitutions, err := podClient.GetVariableSubstitutions()
162+
if err != nil {
163+
logger.Info("Skipping Pod due to missing variable substitutions",
164+
"processGroupID", processGroup.ProcessGroupID)
165+
continue
136166
}
137167

138-
if r.PodLifecycleManager.GetDeletionMode(cluster) == fdbv1beta2.PodUpdateModeNone {
139-
r.Recorder.Event(cluster, corev1.EventTypeNormal,
140-
"NeedsPodsDeletion", "Spec require deleting some pods, but deleting pods is disabled")
141-
cluster.Status.Generations.NeedsPodDeletion = cluster.ObjectMeta.Generation
142-
err = r.updateOrApply(ctx, cluster)
143-
if err != nil {
144-
logger.Error(err, "Error updating cluster status")
145-
}
146-
return &requeue{message: "Pod deletion is disabled"}
168+
if substitutions == nil {
169+
logger.Info("Skipping Pod due to missing locality information",
170+
"processGroupID", processGroup.ProcessGroupID)
171+
continue
147172
}
148-
}
149173

150-
if len(updates) == 0 {
151-
return nil
152-
}
174+
zone := substitutions["FDB_ZONE_ID"]
175+
if reconciler.InSimulation {
176+
zone = "simulation"
177+
}
153178

154-
adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client)
155-
if err != nil {
156-
return &requeue{curError: err}
179+
if updates[zone] == nil {
180+
updates[zone] = make([]*corev1.Pod, 0)
181+
}
182+
updates[zone] = append(updates[zone], pod)
157183
}
158-
defer adminClient.Close()
159184

160-
return deletePodsForUpdates(ctx, r, cluster, adminClient, updates, logger)
185+
return updates, nil
161186
}
162187

163188
func shouldRequeueDueToTerminatingPod(pod *corev1.Pod, cluster *fdbv1beta2.FoundationDBCluster, processGroupID fdbv1beta2.ProcessGroupID) bool {

controllers/update_pods_test.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
package controllers
2222

2323
import (
24+
"context"
2425
"fmt"
2526
"time"
2627

28+
"github.com/FoundationDB/fdb-kubernetes-operator/internal"
29+
ctrlClient "sigs.k8s.io/controller-runtime/pkg/client"
30+
2731
"k8s.io/utils/pointer"
2832

2933
fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2"
@@ -112,9 +116,10 @@ var _ = Describe("update_pods", func() {
112116
Context("Validating shouldRequeueDueToTerminatingPod", func() {
113117
var processGroup = fdbv1beta2.ProcessGroupID("")
114118

115-
When("pod is without deletionTimestamp", func() {
119+
When("Pod is without deletionTimestamp", func() {
116120
var cluster *fdbv1beta2.FoundationDBCluster
117121
var pod *corev1.Pod
122+
118123
BeforeEach(func() {
119124
cluster = &fdbv1beta2.FoundationDBCluster{}
120125
pod = &corev1.Pod{
@@ -129,9 +134,10 @@ var _ = Describe("update_pods", func() {
129134
})
130135
})
131136

132-
When("pod with deletionTimestamp less than ignore limit", func() {
137+
When("Pod with deletionTimestamp less than ignore limit", func() {
133138
var cluster *fdbv1beta2.FoundationDBCluster
134139
var pod *corev1.Pod
140+
135141
BeforeEach(func() {
136142
cluster = &fdbv1beta2.FoundationDBCluster{}
137143
pod = &corev1.Pod{
@@ -147,9 +153,10 @@ var _ = Describe("update_pods", func() {
147153
})
148154
})
149155

150-
When("pod with deletionTimestamp more than ignore limit", func() {
156+
When("Pod with deletionTimestamp more than ignore limit", func() {
151157
var cluster *fdbv1beta2.FoundationDBCluster
152158
var pod *corev1.Pod
159+
153160
BeforeEach(func() {
154161
cluster = &fdbv1beta2.FoundationDBCluster{}
155162
pod = &corev1.Pod{
@@ -169,6 +176,7 @@ var _ = Describe("update_pods", func() {
169176
When("with configured IgnoreTerminatingPodsSeconds", func() {
170177
var cluster *fdbv1beta2.FoundationDBCluster
171178
var pod *corev1.Pod
179+
172180
BeforeEach(func() {
173181
cluster = &fdbv1beta2.FoundationDBCluster{
174182
Spec: fdbv1beta2.FoundationDBClusterSpec{
@@ -191,4 +199,52 @@ var _ = Describe("update_pods", func() {
191199
})
192200
})
193201
})
202+
203+
When("fetching all Pods that needs an update", func() {
204+
var cluster *fdbv1beta2.FoundationDBCluster
205+
var updates map[string][]*corev1.Pod
206+
var expectedError bool
207+
208+
BeforeEach(func() {
209+
cluster = internal.CreateDefaultCluster()
210+
Expect(k8sClient.Create(context.TODO(), cluster)).NotTo(HaveOccurred())
211+
result, err := reconcileCluster(cluster)
212+
Expect(err).NotTo(HaveOccurred())
213+
Expect(result.Requeue).To(BeFalse())
214+
Expect(k8sClient.Get(context.TODO(), ctrlClient.ObjectKeyFromObject(cluster), cluster)).NotTo(HaveOccurred())
215+
})
216+
217+
JustBeforeEach(func() {
218+
pods, err := clusterReconciler.PodLifecycleManager.GetPods(context.TODO(), k8sClient, cluster, internal.GetPodListOptions(cluster, "", "")...)
219+
Expect(err).NotTo(HaveOccurred())
220+
221+
updates, err = getPodsToUpdate(log, clusterReconciler, cluster, internal.CreatePodMap(cluster, pods))
222+
if !expectedError {
223+
Expect(err).NotTo(HaveOccurred())
224+
} else {
225+
Expect(err).To(HaveOccurred())
226+
}
227+
})
228+
229+
When("the cluster has no changes", func() {
230+
It("should return no errors and an empty map", func() {
231+
Expect(updates).To(HaveLen(0))
232+
})
233+
})
234+
235+
When("there is a spec change for all processes", func() {
236+
BeforeEach(func() {
237+
storageSettings := cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral]
238+
storageSettings.PodTemplate.Spec.NodeSelector = map[string]string{"test": "test"}
239+
cluster.Spec.Processes[fdbv1beta2.ProcessClassGeneral] = storageSettings
240+
241+
Expect(k8sClient.Update(context.TODO(), cluster)).NotTo(HaveOccurred())
242+
})
243+
244+
It("should return no errors and a map with one zone", func() {
245+
// We only have one zone in this case, the simulation zone
246+
Expect(updates).To(HaveLen(1))
247+
})
248+
})
249+
})
194250
})

0 commit comments

Comments
 (0)