Skip to content

Commit beeedd5

Browse files
committed
(fix) registry pods do not come up again after node failure
[PR 3201](operator-framework#3201) attempted to solve for the issue by deleting the pods stuck in `Terminating` due to unreachable node. However, the logic to do that was included in `EnsureRegistryServer`, which only gets executed if polling in requested by the user. This PR moves the logic of checking for dead pods out of `EnsureRegistryServer`, and puts it in `CheckRegistryServer` instead. This way, if there are any dead pods detected during `CheckRegistryServer`, the value of `healthy` is returned `false`, which inturn triggers `EnsureRegistryServer`.
1 parent 27f347e commit beeedd5

File tree

4 files changed

+133
-36
lines changed

4 files changed

+133
-36
lines changed

pkg/controller/registry/reconciler/configmap.go

+41-8
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@ package reconciler
33

44
import (
55
"context"
6+
"errors"
67
"fmt"
78

89
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
910
hashutil "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/hash"
10-
"github.com/pkg/errors"
11+
pkgerrors "github.com/pkg/errors"
1112
"github.com/sirupsen/logrus"
1213
corev1 "k8s.io/api/core/v1"
1314
rbacv1 "k8s.io/api/rbac/v1"
1415
apierrors "k8s.io/apimachinery/pkg/api/errors"
1516
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1617
"k8s.io/apimachinery/pkg/labels"
1718
"k8s.io/apimachinery/pkg/util/intstr"
19+
"k8s.io/utils/ptr"
1820

1921
"github.com/operator-framework/api/pkg/operators/v1alpha1"
2022
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
@@ -327,27 +329,27 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry,
327329

328330
//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
329331
if err := c.ensureServiceAccount(source, overwrite); err != nil {
330-
return errors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
332+
return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
331333
}
332334
if err := c.ensureRole(source, overwrite); err != nil {
333-
return errors.Wrapf(err, "error ensuring role: %s", source.roleName())
335+
return pkgerrors.Wrapf(err, "error ensuring role: %s", source.roleName())
334336
}
335337
if err := c.ensureRoleBinding(source, overwrite); err != nil {
336-
return errors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
338+
return pkgerrors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
337339
}
338340
pod, err := source.Pod(image, defaultPodSecurityConfig)
339341
if err != nil {
340342
return err
341343
}
342344
if err := c.ensurePod(source, defaultPodSecurityConfig, overwritePod); err != nil {
343-
return errors.Wrapf(err, "error ensuring pod: %s", pod.GetName())
345+
return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName())
344346
}
345347
service, err := source.Service()
346348
if err != nil {
347349
return err
348350
}
349351
if err := c.ensureService(source, overwrite); err != nil {
350-
return errors.Wrapf(err, "error ensuring service: %s", service.GetName())
352+
return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName())
351353
}
352354

353355
if overwritePod {
@@ -420,15 +422,15 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDec
420422
}
421423
for _, p := range currentPods {
422424
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) {
423-
return errors.Wrapf(err, "error deleting old pod: %s", p.GetName())
425+
return pkgerrors.Wrapf(err, "error deleting old pod: %s", p.GetName())
424426
}
425427
}
426428
}
427429
_, err = c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{})
428430
if err == nil {
429431
return nil
430432
}
431-
return errors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
433+
return pkgerrors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
432434
}
433435

434436
func (c *ConfigMapRegistryReconciler) ensureService(source configMapCatalogSourceDecorator, overwrite bool) error {
@@ -513,5 +515,36 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(logger *logrus.Entry,
513515
}
514516

515517
healthy = true
518+
if deadPodsDetected, e := detectAndDeleteDeadPods(logger, c.OpClient, pods, source.GetNamespace()); deadPodsDetected {
519+
healthy = false
520+
err = e
521+
}
522+
return
523+
}
524+
525+
// detectAndDeleteDeadPods determines if there are registry client pods that are in the deleted state
526+
// but have not been removed by GC (eg the node goes down before GC can remove them), and attempts to
527+
// force delete the pods
528+
func detectAndDeleteDeadPods(logger *logrus.Entry, client operatorclient.ClientInterface, pods []*corev1.Pod, sourceNamespace string) (foundDeadPod bool, err error) {
529+
var forceDeletionErrs []error
530+
foundDeadPod = false
531+
for _, pod := range pods {
532+
if !isPodDead(pod) {
533+
logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Debug("pod is alive")
534+
continue
535+
}
536+
foundDeadPod = true
537+
logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Info("force deleting dead pod")
538+
if err := client.KubernetesInterface().CoreV1().Pods(sourceNamespace).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
539+
GracePeriodSeconds: ptr.To[int64](0),
540+
}); err != nil && !apierrors.IsNotFound(err) {
541+
forceDeletionErrs = append(forceDeletionErrs, pkgerrors.Wrapf(err, "error deleting old pod: %s", pod.GetName()))
542+
}
543+
}
544+
if len(forceDeletionErrs) > 0 {
545+
err = errors.Join(forceDeletionErrs...)
546+
} else {
547+
err = nil
548+
}
516549
return
517550
}

pkg/controller/registry/reconciler/configmap_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -527,3 +527,55 @@ func TestConfigMapRegistryReconciler(t *testing.T) {
527527
})
528528
}
529529
}
530+
531+
func TestConfigMapRegistryChecker(t *testing.T) {
532+
validConfigMap := validConfigMap()
533+
validCatalogSource := validConfigMapCatalogSource(validConfigMap)
534+
type cluster struct {
535+
k8sObjs []runtime.Object
536+
}
537+
type in struct {
538+
cluster cluster
539+
catsrc *v1alpha1.CatalogSource
540+
}
541+
type out struct {
542+
healthy bool
543+
err error
544+
}
545+
tests := []struct {
546+
testName string
547+
in in
548+
out out
549+
}{
550+
{
551+
testName: "ConfigMap/ExistingRegistry/DeadPod",
552+
in: in{
553+
cluster: cluster{
554+
k8sObjs: append(withPodDeletedButNotRemoved(objectsForCatalogSource(t, validCatalogSource)), validConfigMap),
555+
},
556+
catsrc: validCatalogSource,
557+
},
558+
out: out{
559+
healthy: false,
560+
},
561+
},
562+
}
563+
for _, tt := range tests {
564+
t.Run(tt.testName, func(t *testing.T) {
565+
stopc := make(chan struct{})
566+
defer close(stopc)
567+
568+
factory, _ := fakeReconcilerFactory(t, stopc, withK8sObjs(tt.in.cluster.k8sObjs...))
569+
rec := factory.ReconcilerForSource(tt.in.catsrc)
570+
571+
healthy, err := rec.CheckRegistryServer(logrus.NewEntry(logrus.New()), tt.in.catsrc)
572+
573+
require.Equal(t, tt.out.err, err)
574+
if tt.out.err != nil {
575+
return
576+
}
577+
578+
require.Equal(t, tt.out.healthy, healthy)
579+
})
580+
}
581+
}

pkg/controller/registry/reconciler/grpc.go

+8-28
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package reconciler
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"slices"
86
"strings"
97
"time"
108

@@ -24,7 +22,6 @@ import (
2422
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2523
"k8s.io/apimachinery/pkg/labels"
2624
"k8s.io/apimachinery/pkg/util/intstr"
27-
"k8s.io/utils/ptr"
2825
)
2926

3027
const (
@@ -347,31 +344,12 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) (bool, err
347344

348345
func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, defaultPodSecurityConfig v1alpha1.SecurityConfig, overwrite bool) error {
349346
// currentPods refers to the current pod instances of the catalog source
350-
currentPods := c.currentPods(logger, source)
351-
352-
var forceDeleteErrs []error
353-
currentPods = slices.DeleteFunc(currentPods, func(pod *corev1.Pod) bool {
354-
if !isPodDead(pod) {
355-
logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Debug("pod is alive")
356-
return false
357-
}
358-
logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Info("force deleting dead pod")
359-
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
360-
GracePeriodSeconds: ptr.To[int64](0),
361-
}); err != nil && !apierrors.IsNotFound(err) {
362-
forceDeleteErrs = append(forceDeleteErrs, pkgerrors.Wrapf(err, "error deleting old pod: %s", pod.GetName()))
363-
}
364-
return true
365-
})
366-
if len(forceDeleteErrs) > 0 {
367-
return errors.Join(forceDeleteErrs...)
368-
}
369-
370-
if len(currentPods) > 0 {
347+
currentLivePods := c.currentPods(logger, source)
348+
if len(currentLivePods) > 0 {
371349
if !overwrite {
372350
return nil
373351
}
374-
for _, p := range currentPods {
352+
for _, p := range currentLivePods {
375353
logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": p.GetName()}).Info("deleting current pod")
376354
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) {
377355
return pkgerrors.Wrapf(err, "error deleting old pod: %s", p.GetName())
@@ -628,15 +606,17 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal
628606
if err != nil {
629607
return false, err
630608
}
631-
current, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig)
609+
currentPods, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig)
632610
if err != nil {
633611
return false, err
634612
}
635-
if len(current) < 1 ||
613+
if len(currentPods) < 1 ||
636614
service == nil || c.currentServiceAccount(source) == nil {
637615
return false, nil
638616
}
639-
617+
if deadPodsDetected, e := detectAndDeleteDeadPods(logger, c.OpClient, currentPods, source.GetNamespace()); deadPodsDetected {
618+
return false, e
619+
}
640620
return true, nil
641621
}
642622

pkg/controller/registry/reconciler/grpc_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package reconciler
22

33
import (
44
"context"
5+
"reflect"
56
"testing"
67
"time"
78

@@ -70,6 +71,25 @@ func grpcCatalogSourceWithName(name string) *v1alpha1.CatalogSource {
7071
return catsrc
7172
}
7273

74+
func withPodDeletedButNotRemoved(objs []runtime.Object) []runtime.Object {
75+
var out []runtime.Object
76+
t := reflect.TypeOf(&corev1.Pod{})
77+
for _, obj := range objs {
78+
o := obj.DeepCopyObject()
79+
if reflect.TypeOf(o) == t {
80+
pod, _ := obj.(*corev1.Pod)
81+
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
82+
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
83+
Type: corev1.DisruptionTarget,
84+
Reason: "DeletionByTaintManager",
85+
Status: corev1.ConditionTrue,
86+
})
87+
o = pod
88+
}
89+
out = append(out, o)
90+
}
91+
return out
92+
}
7393
func TestGrpcRegistryReconciler(t *testing.T) {
7494
now := func() metav1.Time { return metav1.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC) }
7595
blockOwnerDeletion := true
@@ -558,6 +578,18 @@ func TestGrpcRegistryChecker(t *testing.T) {
558578
healthy: false,
559579
},
560580
},
581+
{
582+
testName: "Grpc/ExistingRegistry/Image/DeadPod",
583+
in: in{
584+
cluster: cluster{
585+
k8sObjs: withPodDeletedButNotRemoved(objectsForCatalogSource(t, validGrpcCatalogSource("test-img", ""))),
586+
},
587+
catsrc: validGrpcCatalogSource("test-img", ""),
588+
},
589+
out: out{
590+
healthy: false,
591+
},
592+
},
561593
{
562594
testName: "Grpc/ExistingRegistry/Image/OldPod/NotHealthy",
563595
in: in{

0 commit comments

Comments
 (0)