From 82f499723e52e85f28653af0610b6e7feff096cf Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Thu, 11 Apr 2024 12:45:52 -0400 Subject: [PATCH] catalog-operator: delete catalog pods stuck in Terminating state due to unreachable node Signed-off-by: Joe Lanford --- Dockerfile | 4 +- pkg/controller/registry/reconciler/grpc.go | 77 +++++++++++++++++----- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1ed3c19c3d..c4bc9e22b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,11 +2,11 @@ FROM quay.io/fedora/fedora:37-x86_64 as builder LABEL stage=builder WORKDIR /build -# install dependencies and go 1.16 +# install dependencies and go 1.21 # copy just enough of the git repo to parse HEAD, used to record version in OLM binaries RUN dnf update -y && dnf install -y bash make git mercurial jq wget && dnf upgrade -y -RUN curl -sSL https://go.dev/dl/go1.20.linux-amd64.tar.gz | tar -xzf - -C /usr/local +RUN curl -sSL https://go.dev/dl/go1.21.9.linux-amd64.tar.gz | tar -xzf - -C /usr/local ENV PATH=/usr/local/go/bin:$PATH COPY .git/HEAD .git/HEAD COPY .git/refs/heads/. .git/refs/heads diff --git a/pkg/controller/registry/reconciler/grpc.go b/pkg/controller/registry/reconciler/grpc.go index 8864e1acf8..4981600256 100644 --- a/pkg/controller/registry/reconciler/grpc.go +++ b/pkg/controller/registry/reconciler/grpc.go @@ -2,19 +2,22 @@ package reconciler import ( "context" + "errors" "fmt" + "slices" "strings" "time" "github.com/google/go-cmp/cmp" "github.com/operator-framework/api/pkg/operators/v1alpha1" - "github.com/pkg/errors" + pkgerrors "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install" controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client" @@ -262,7 +265,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata //TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated) sa, err := c.ensureSA(source) if err != nil && !apierrors.IsAlreadyExists(err) { - return errors.Wrapf(err, "error ensuring service account: %s", source.GetName()) + return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.GetName()) } sa, err = c.OpClient.GetServiceAccount(sa.GetNamespace(), sa.GetName()) @@ -285,20 +288,20 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata return err } if err := c.ensurePod(logger, source, sa, overwritePod); err != nil { - return errors.Wrapf(err, "error ensuring pod: %s", pod.GetName()) + return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName()) } if err := c.ensureUpdatePod(logger, sa, source); err != nil { if _, ok := err.(UpdateNotReadyErr); ok { return err } - return errors.Wrapf(err, "error ensuring updated catalog source pod: %s", pod.GetName()) + return pkgerrors.Wrapf(err, "error ensuring updated catalog source pod: %s", pod.GetName()) } service, err := source.Service() if err != nil { return err } if err := c.ensureService(source, overwrite); err != nil { - return errors.Wrapf(err, "error ensuring service: %s", service.GetName()) + return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName()) } if overwritePod { @@ -338,16 +341,35 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) (bool, err } func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, overwrite bool) error { - // currentLivePods refers to the currently live instances of the catalog source - currentLivePods := c.currentPods(logger, source) - if len(currentLivePods) > 0 { + // currentPods refers to the current pod instances of the catalog source + currentPods := c.currentPods(logger, source) + + var forceDeleteErrs []error + currentPods = slices.DeleteFunc(currentPods, func(pod *corev1.Pod) bool { + if !isPodDead(pod) { + logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Debug("pod is alive") + return false + } + logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Info("force deleting dead pod") + if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{ + GracePeriodSeconds: ptr.To[int64](0), + }); err != nil && !apierrors.IsNotFound(err) { + forceDeleteErrs = append(forceDeleteErrs, pkgerrors.Wrapf(err, "error deleting old pod: %s", pod.GetName())) + } + return true + }) + if len(forceDeleteErrs) > 0 { + return errors.Join(forceDeleteErrs...) + } + + if len(currentPods) > 0 { if !overwrite { return nil } - for _, p := range currentLivePods { + for _, p := range currentPods { logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": p.GetName()}).Info("deleting current pod") if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) { - return errors.Wrapf(err, "error deleting old pod: %s", p.GetName()) + return pkgerrors.Wrapf(err, "error deleting old pod: %s", p.GetName()) } } } @@ -358,7 +380,7 @@ func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCata logger.WithFields(logrus.Fields{"pod.namespace": desiredPod.GetNamespace(), "pod.name": desiredPod.GetName()}).Info("creating desired pod") _, err = c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Create(context.TODO(), desiredPod, metav1.CreateOptions{}) if err != nil { - return errors.Wrapf(err, "error creating new pod: %s", desiredPod.GetGenerateName()) + return pkgerrors.Wrapf(err, "error creating new pod: %s", desiredPod.GetGenerateName()) } return nil @@ -378,7 +400,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc logger.Infof("catalog update required at %s", time.Now().String()) pod, err := c.createUpdatePod(source, serviceAccount) if err != nil { - return errors.Wrapf(err, "creating update catalog source pod") + return pkgerrors.Wrapf(err, "creating update catalog source pod") } source.SetLastUpdateTime() return UpdateNotReadyErr{catalogName: source.GetName(), podName: pod.GetName()} @@ -410,7 +432,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc for _, p := range currentLivePods { logger.WithFields(logrus.Fields{"live-pod.namespace": source.GetNamespace(), "live-pod.name": p.Name}).Info("deleting current live pods") if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) { - return errors.Wrapf(errors.Wrapf(err, "error deleting pod: %s", p.GetName()), "detected imageID change: error deleting old catalog source pod") + return pkgerrors.Wrapf(pkgerrors.Wrapf(err, "error deleting pod: %s", p.GetName()), "detected imageID change: error deleting old catalog source pod") } } // done syncing @@ -420,7 +442,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc // delete update pod right away, since the digest match, to prevent long-lived duplicate catalog pods logger.WithFields(logrus.Fields{"update-pod.namespace": updatePod.Namespace, "update-pod.name": updatePod.Name}).Debug("catalog polling result: no update; removing duplicate update pod") if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), updatePod.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) { - return errors.Wrapf(errors.Wrapf(err, "error deleting pod: %s", updatePod.GetName()), "duplicate catalog polling pod") + return pkgerrors.Wrapf(pkgerrors.Wrapf(err, "error deleting pod: %s", updatePod.GetName()), "duplicate catalog polling pod") } } @@ -523,6 +545,29 @@ func imageChanged(logger *logrus.Entry, updatePod *corev1.Pod, servingPods []*co return false } +func isPodDead(pod *corev1.Pod) bool { + for _, check := range []func(*corev1.Pod) bool{ + isPodDeletedByTaintManager, + } { + if check(pod) { + return true + } + } + return false +} + +func isPodDeletedByTaintManager(pod *corev1.Pod) bool { + if pod.DeletionTimestamp == nil { + return false + } + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.DisruptionTarget && condition.Reason == "DeletionByTaintManager" && condition.Status == corev1.ConditionTrue { + return true + } + } + return false +} + // imageID returns the ImageID of the primary catalog source container or an empty string if the image ID isn't available yet. // Note: the pod must be running and the container in a ready status to return a valid ImageID. func imageID(pod *corev1.Pod) string { @@ -545,7 +590,7 @@ func imageID(pod *corev1.Pod) string { func (c *GrpcRegistryReconciler) removePods(pods []*corev1.Pod, namespace string) error { for _, p := range pods { if err := c.OpClient.KubernetesInterface().CoreV1().Pods(namespace).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) { - return errors.Wrapf(err, "error deleting pod: %s", p.GetName()) + return pkgerrors.Wrapf(err, "error deleting pod: %s", p.GetName()) } } return nil @@ -623,7 +668,7 @@ func (c *GrpcRegistryReconciler) podFailed(pod *corev1.Pod) (bool, error) { logrus.WithField("UpdatePod", pod.GetName()).Infof("catalog polling result: update pod %s failed to start", pod.GetName()) err := c.removePods([]*corev1.Pod{pod}, pod.GetNamespace()) if err != nil { - return true, errors.Wrapf(err, "error deleting failed catalog polling pod: %s", pod.GetName()) + return true, pkgerrors.Wrapf(err, "error deleting failed catalog polling pod: %s", pod.GetName()) } return true, nil }