Skip to content

OCPBUGS-42150: (fix) registry pods do not come up again after node failure (#3366) #872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
k8s.io/client-go v0.27.8
k8s.io/code-generator v0.27.8
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3
sigs.k8s.io/controller-runtime v0.15.0
sigs.k8s.io/controller-tools v0.8.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1484,8 +1484,8 @@ k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3
k8s.io/kubectl v0.27.8 h1:VipG0f9E1kGRGJYm2/kNv188RgDduvx1g2q1b20niHg=
k8s.io/kubectl v0.27.8/go.mod h1:ZufZqfI5V7oBuGFALJHoTxypO0fewOwbadr4saUkRKo=
k8s.io/utils v0.0.0-20200324210504-a9aa75ae1b89/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 h1:kmDqav+P+/5e1i9tFfHq1qcF3sOrDp+YEkVDAHu7Jwk=
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 h1:b2FmK8YH+QEwq/Sy2uAEhmqL5nPfGYbJOcaqjeYYZoA=
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
oras.land/oras-go v1.2.4 h1:djpBY2/2Cs1PV87GSJlxv4voajVOMZxqqtq9AB8YNvY=
oras.land/oras-go v1.2.4/go.mod h1:DYcGfb3YF1nKjcezfX2SNlDAeQFKSXmf+qrFmrh4324=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
2 changes: 1 addition & 1 deletion staging/operator-lifecycle-manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
k8s.io/klog/v2 v2.90.1
k8s.io/kube-aggregator v0.25.3
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3
sigs.k8s.io/controller-runtime v0.15.0
sigs.k8s.io/controller-tools v0.8.0
sigs.k8s.io/kind v0.20.0
Expand Down
4 changes: 2 additions & 2 deletions staging/operator-lifecycle-manager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,8 @@ k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5F
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f/go.mod h1:byini6yhqGC14c3ebc/QwanvYwhuMWF6yz2F8uwW8eg=
k8s.io/kubectl v0.27.7 h1:HTEDa4s/oWjB3t5ysdW1yKlcNl9bzigcqWBq0LIIe3k=
k8s.io/kubectl v0.27.7/go.mod h1:Xb1Ubc8uN1i2RvSN1HCgSHTtzgX0woihMk/gW7XbjJU=
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 h1:kmDqav+P+/5e1i9tFfHq1qcF3sOrDp+YEkVDAHu7Jwk=
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 h1:b2FmK8YH+QEwq/Sy2uAEhmqL5nPfGYbJOcaqjeYYZoA=
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
oras.land/oras-go v1.2.3 h1:v8PJl+gEAntI1pJ/LCrDgsuk+1PKVavVEPsYIHFE5uY=
oras.land/oras-go v1.2.3/go.mod h1:M/uaPdYklze0Vf3AakfarnpoEckvw0ESbRdN8Z1vdJg=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package reconciler

import (
"context"
"errors"
"fmt"

"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
Expand Down Expand Up @@ -284,19 +286,19 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alph

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
if err := c.ensureServiceAccount(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
}
if err := c.ensureRole(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring role: %s", source.roleName())
return pkgerrors.Wrapf(err, "error ensuring role: %s", source.roleName())
}
if err := c.ensureRoleBinding(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
return pkgerrors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
}
if err := c.ensurePod(source, overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", source.Pod(image).GetName())
return pkgerrors.Wrapf(err, "error ensuring pod: %s", source.Pod(image).GetName())
}
if err := c.ensureService(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service: %s", source.Service().GetName())
return pkgerrors.Wrapf(err, "error ensuring service: %s", source.Service().GetName())
}

if overwritePod {
Expand Down Expand Up @@ -363,15 +365,15 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDec
}
for _, p := range currentPods {
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting old pod: %s", p.GetName())
return pkgerrors.Wrapf(err, "error deleting old pod: %s", p.GetName())
}
}
}
_, err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{})
if err == nil {
return nil
}
return errors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
return pkgerrors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
}

func (c *ConfigMapRegistryReconciler) ensureService(source configMapCatalogSourceDecorator, overwrite bool) error {
Expand All @@ -390,16 +392,15 @@ func (c *ConfigMapRegistryReconciler) ensureService(source configMapCatalogSourc
}

// CheckRegistryServer returns true if the given CatalogSource is considered healthy; false otherwise.
func (c *ConfigMapRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha1.CatalogSource) (healthy bool, err error) {
func (c *ConfigMapRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha1.CatalogSource) (bool, error) {
source := configMapCatalogSourceDecorator{catalogSource, c.createPodAsUser}

image := c.Image
if source.Spec.SourceType == "grpc" {
image = source.Spec.Image
}
if image == "" {
err = fmt.Errorf("no image for registry")
return
err := fmt.Errorf("no image for registry")
return false, err
}

if source.Spec.SourceType == v1alpha1.SourceTypeConfigmap || source.Spec.SourceType == v1alpha1.SourceTypeInternal {
Expand All @@ -426,10 +427,59 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha
c.currentRoleBinding(source) == nil ||
c.currentService(source) == nil ||
len(c.currentPods(source, c.Image)) < 1 {
healthy = false
return

return false, nil
}

healthy = true
return
podsAreLive, e := detectAndDeleteDeadPods(c.OpClient, c.currentPods(source, c.Image), source.GetNamespace())
if e != nil {
return false, fmt.Errorf("error deleting dead pods: %v", e)
}
return podsAreLive, nil
}

// detectAndDeleteDeadPods determines if there are registry client pods that are in the deleted state
// but have not been removed by GC (eg the node goes down before GC can remove them), and attempts to
// force delete the pods. If there are live registry pods remaining, it returns true, otherwise returns false.
func detectAndDeleteDeadPods(client operatorclient.ClientInterface, pods []*corev1.Pod, sourceNamespace string) (bool, error) {
var forceDeletionErrs []error
livePodFound := false
for _, pod := range pods {
if !isPodDead(pod) {
livePodFound = true
continue
}
if err := client.KubernetesInterface().CoreV1().Pods(sourceNamespace).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
}); err != nil && !apierrors.IsNotFound(err) {
forceDeletionErrs = append(forceDeletionErrs, err)
}
}
if len(forceDeletionErrs) > 0 {
return false, errors.Join(forceDeletionErrs...)
}
return livePodFound, nil
}

func isPodDead(pod *corev1.Pod) bool {
for _, check := range []func(*corev1.Pod) bool{
isPodDeletedByTaintManager,
} {
if check(pod) {
return true
}
}
return false
}

func isPodDeletedByTaintManager(pod *corev1.Pod) bool {
if pod.DeletionTimestamp == nil {
return false
}
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.DisruptionTarget && condition.Reason == "DeletionByTaintManager" && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,55 @@ func TestConfigMapRegistryReconciler(t *testing.T) {
})
}
}

func TestConfigMapRegistryChecker(t *testing.T) {
validConfigMap := validConfigMap()
validCatalogSource := validConfigMapCatalogSource(validConfigMap)
type cluster struct {
k8sObjs []runtime.Object
}
type in struct {
cluster cluster
catsrc *v1alpha1.CatalogSource
}
type out struct {
healthy bool
err error
}
tests := []struct {
testName string
in in
out out
}{
{
testName: "ConfigMap/ExistingRegistry/DeadPod",
in: in{
cluster: cluster{
k8sObjs: append(withPodDeletedButNotRemoved(objectsForCatalogSource(validCatalogSource)), validConfigMap),
},
catsrc: validCatalogSource,
},
out: out{
healthy: false,
},
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
stopc := make(chan struct{})
defer close(stopc)

factory, _ := fakeReconcilerFactory(t, stopc, withK8sObjs(tt.in.cluster.k8sObjs...))
rec := factory.ReconcilerForSource(tt.in.catsrc)

healthy, err := rec.CheckRegistryServer(tt.in.catsrc)

require.Equal(t, tt.out.err, err)
if tt.out.err != nil {
return
}

require.Equal(t, tt.out.healthy, healthy)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/pkg/errors"

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -256,13 +257,13 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) bool {
}

func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, saName string, overwrite bool) error {
// currentLivePods refers to the currently live instances of the catalog source
currentLivePods := c.currentPods(source)
if len(currentLivePods) > 0 {
// currentPods refers to the current pod instances of the catalog source
currentPods := c.currentPods(source)
if len(currentPods) > 0 {
if !overwrite {
return nil
}
for _, p := range currentLivePods {
for _, p := range currentPods {
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting old pod: %s", p.GetName())
}
Expand Down Expand Up @@ -448,18 +449,20 @@ func (c *GrpcRegistryReconciler) removePods(pods []*corev1.Pod, namespace string
}

// CheckRegistryServer returns true if the given CatalogSource is considered healthy; false otherwise.
func (c *GrpcRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha1.CatalogSource) (healthy bool, err error) {
func (c *GrpcRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha1.CatalogSource) (bool, error) {
source := grpcCatalogSourceDecorator{catalogSource, c.createPodAsUser}
// Check on registry resources
// TODO: add gRPC health check
if len(c.currentPodsWithCorrectImageAndSpec(source, source.ServiceAccount().GetName())) < 1 ||
currentPods := c.currentPodsWithCorrectImageAndSpec(source, source.ServiceAccount().Name)
if len(currentPods) < 1 ||
c.currentService(source) == nil || c.currentServiceAccount(source) == nil {
healthy = false
return
return false, nil
}

healthy = true
return
podsAreLive, e := detectAndDeleteDeadPods(c.OpClient, currentPods, source.GetNamespace())
if e != nil {
return false, fmt.Errorf("error deleting dead pods: %v", e)
}
return podsAreLive, nil
}

// promoteCatalog swaps the labels on the update pod so that the update pod is now reachable by the catalog service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ func grpcCatalogSourceWithAnnotations(annotations map[string]string) *v1alpha1.C
return catsrc
}

func withPodDeletedButNotRemoved(objs []runtime.Object) []runtime.Object {
var out []runtime.Object
for _, obj := range objs {
o := obj.DeepCopyObject()
if pod, ok := obj.(*corev1.Pod); ok {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
Type: corev1.DisruptionTarget,
Reason: "DeletionByTaintManager",
Status: corev1.ConditionTrue,
})
o = pod
}
out = append(out, o)
}
return out
}
func TestGrpcRegistryReconciler(t *testing.T) {
now := func() metav1.Time { return metav1.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC) }
blockOwnerDeletion := true
Expand Down Expand Up @@ -508,6 +525,18 @@ func TestGrpcRegistryChecker(t *testing.T) {
healthy: false,
},
},
{
testName: "Grpc/ExistingRegistry/Image/DeadPod",
in: in{
cluster: cluster{
k8sObjs: withPodDeletedButNotRemoved(objectsForCatalogSource(validGrpcCatalogSource("test-img", ""))),
},
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
healthy: false,
},
},
{
testName: "Grpc/ExistingRegistry/Image/OldPod/NotHealthy",
in: in{
Expand Down
Loading