@@ -2,19 +2,22 @@ package reconciler
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"fmt"
7
+ "slices"
6
8
"strings"
7
9
"time"
8
10
9
11
"github.com/google/go-cmp/cmp"
10
12
"github.com/operator-framework/api/pkg/operators/v1alpha1"
11
- "github.com/pkg/errors"
13
+ pkgerrors "github.com/pkg/errors"
12
14
"github.com/sirupsen/logrus"
13
15
corev1 "k8s.io/api/core/v1"
14
16
apierrors "k8s.io/apimachinery/pkg/api/errors"
15
17
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16
18
"k8s.io/apimachinery/pkg/labels"
17
19
"k8s.io/apimachinery/pkg/util/intstr"
20
+ "k8s.io/utils/ptr"
18
21
19
22
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
20
23
controllerclient "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/controller-runtime/client"
@@ -262,7 +265,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata
262
265
//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
263
266
sa , err := c .ensureSA (source )
264
267
if err != nil && ! apierrors .IsAlreadyExists (err ) {
265
- return errors .Wrapf (err , "error ensuring service account: %s" , source .GetName ())
268
+ return pkgerrors .Wrapf (err , "error ensuring service account: %s" , source .GetName ())
266
269
}
267
270
268
271
sa , err = c .OpClient .GetServiceAccount (sa .GetNamespace (), sa .GetName ())
@@ -285,20 +288,20 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry, cata
285
288
return err
286
289
}
287
290
if err := c .ensurePod (logger , source , sa , overwritePod ); err != nil {
288
- return errors .Wrapf (err , "error ensuring pod: %s" , pod .GetName ())
291
+ return pkgerrors .Wrapf (err , "error ensuring pod: %s" , pod .GetName ())
289
292
}
290
293
if err := c .ensureUpdatePod (logger , sa , source ); err != nil {
291
294
if _ , ok := err .(UpdateNotReadyErr ); ok {
292
295
return err
293
296
}
294
- return errors .Wrapf (err , "error ensuring updated catalog source pod: %s" , pod .GetName ())
297
+ return pkgerrors .Wrapf (err , "error ensuring updated catalog source pod: %s" , pod .GetName ())
295
298
}
296
299
service , err := source .Service ()
297
300
if err != nil {
298
301
return err
299
302
}
300
303
if err := c .ensureService (source , overwrite ); err != nil {
301
- return errors .Wrapf (err , "error ensuring service: %s" , service .GetName ())
304
+ return pkgerrors .Wrapf (err , "error ensuring service: %s" , service .GetName ())
302
305
}
303
306
304
307
if overwritePod {
@@ -338,16 +341,35 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) (bool, err
338
341
}
339
342
340
343
func (c * GrpcRegistryReconciler ) ensurePod (logger * logrus.Entry , source grpcCatalogSourceDecorator , serviceAccount * corev1.ServiceAccount , overwrite bool ) error {
341
- // currentLivePods refers to the currently live instances of the catalog source
342
- currentLivePods := c .currentPods (logger , source )
343
- if len (currentLivePods ) > 0 {
344
+ // currentPods refers to the current pod instances of the catalog source
345
+ currentPods := c .currentPods (logger , source )
346
+
347
+ var forceDeleteErrs []error
348
+ currentPods = slices .DeleteFunc (currentPods , func (pod * corev1.Pod ) bool {
349
+ if ! isPodDead (pod ) {
350
+ logger .WithFields (logrus.Fields {"pod.namespace" : source .GetNamespace (), "pod.name" : pod .GetName ()}).Debug ("pod is alive" )
351
+ return false
352
+ }
353
+ logger .WithFields (logrus.Fields {"pod.namespace" : source .GetNamespace (), "pod.name" : pod .GetName ()}).Info ("force deleting dead pod" )
354
+ if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), pod .GetName (), metav1.DeleteOptions {
355
+ GracePeriodSeconds : ptr.To [int64 ](0 ),
356
+ }); err != nil && ! apierrors .IsNotFound (err ) {
357
+ forceDeleteErrs = append (forceDeleteErrs , pkgerrors .Wrapf (err , "error deleting old pod: %s" , pod .GetName ()))
358
+ }
359
+ return true
360
+ })
361
+ if len (forceDeleteErrs ) > 0 {
362
+ return errors .Join (forceDeleteErrs ... )
363
+ }
364
+
365
+ if len (currentPods ) > 0 {
344
366
if ! overwrite {
345
367
return nil
346
368
}
347
- for _ , p := range currentLivePods {
369
+ for _ , p := range currentPods {
348
370
logger .WithFields (logrus.Fields {"pod.namespace" : source .GetNamespace (), "pod.name" : p .GetName ()}).Info ("deleting current pod" )
349
371
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), p .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
350
- return errors .Wrapf (err , "error deleting old pod: %s" , p .GetName ())
372
+ return pkgerrors .Wrapf (err , "error deleting old pod: %s" , p .GetName ())
351
373
}
352
374
}
353
375
}
@@ -358,7 +380,7 @@ func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCata
358
380
logger .WithFields (logrus.Fields {"pod.namespace" : desiredPod .GetNamespace (), "pod.name" : desiredPod .GetName ()}).Info ("creating desired pod" )
359
381
_ , err = c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Create (context .TODO (), desiredPod , metav1.CreateOptions {})
360
382
if err != nil {
361
- return errors .Wrapf (err , "error creating new pod: %s" , desiredPod .GetGenerateName ())
383
+ return pkgerrors .Wrapf (err , "error creating new pod: %s" , desiredPod .GetGenerateName ())
362
384
}
363
385
364
386
return nil
@@ -378,7 +400,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc
378
400
logger .Infof ("catalog update required at %s" , time .Now ().String ())
379
401
pod , err := c .createUpdatePod (source , serviceAccount )
380
402
if err != nil {
381
- return errors .Wrapf (err , "creating update catalog source pod" )
403
+ return pkgerrors .Wrapf (err , "creating update catalog source pod" )
382
404
}
383
405
source .SetLastUpdateTime ()
384
406
return UpdateNotReadyErr {catalogName : source .GetName (), podName : pod .GetName ()}
@@ -410,7 +432,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc
410
432
for _ , p := range currentLivePods {
411
433
logger .WithFields (logrus.Fields {"live-pod.namespace" : source .GetNamespace (), "live-pod.name" : p .Name }).Info ("deleting current live pods" )
412
434
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), p .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
413
- return errors .Wrapf (errors .Wrapf (err , "error deleting pod: %s" , p .GetName ()), "detected imageID change: error deleting old catalog source pod" )
435
+ return pkgerrors .Wrapf (pkgerrors .Wrapf (err , "error deleting pod: %s" , p .GetName ()), "detected imageID change: error deleting old catalog source pod" )
414
436
}
415
437
}
416
438
// done syncing
@@ -420,7 +442,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(logger *logrus.Entry, serviceAc
420
442
// delete update pod right away, since the digest match, to prevent long-lived duplicate catalog pods
421
443
logger .WithFields (logrus.Fields {"update-pod.namespace" : updatePod .Namespace , "update-pod.name" : updatePod .Name }).Debug ("catalog polling result: no update; removing duplicate update pod" )
422
444
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Delete (context .TODO (), updatePod .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
423
- return errors .Wrapf (errors .Wrapf (err , "error deleting pod: %s" , updatePod .GetName ()), "duplicate catalog polling pod" )
445
+ return pkgerrors .Wrapf (pkgerrors .Wrapf (err , "error deleting pod: %s" , updatePod .GetName ()), "duplicate catalog polling pod" )
424
446
}
425
447
}
426
448
@@ -523,6 +545,29 @@ func imageChanged(logger *logrus.Entry, updatePod *corev1.Pod, servingPods []*co
523
545
return false
524
546
}
525
547
548
+ func isPodDead (pod * corev1.Pod ) bool {
549
+ for _ , check := range []func (* corev1.Pod ) bool {
550
+ isPodDeletedByTaintManager ,
551
+ } {
552
+ if check (pod ) {
553
+ return true
554
+ }
555
+ }
556
+ return false
557
+ }
558
+
559
+ func isPodDeletedByTaintManager (pod * corev1.Pod ) bool {
560
+ if pod .DeletionTimestamp == nil {
561
+ return false
562
+ }
563
+ for _ , condition := range pod .Status .Conditions {
564
+ if condition .Type == corev1 .DisruptionTarget && condition .Reason == "DeletionByTaintManager" && condition .Status == corev1 .ConditionTrue {
565
+ return true
566
+ }
567
+ }
568
+ return false
569
+ }
570
+
526
571
// imageID returns the ImageID of the primary catalog source container or an empty string if the image ID isn't available yet.
527
572
// Note: the pod must be running and the container in a ready status to return a valid ImageID.
528
573
func imageID (pod * corev1.Pod ) string {
@@ -545,7 +590,7 @@ func imageID(pod *corev1.Pod) string {
545
590
func (c * GrpcRegistryReconciler ) removePods (pods []* corev1.Pod , namespace string ) error {
546
591
for _ , p := range pods {
547
592
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (namespace ).Delete (context .TODO (), p .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
548
- return errors .Wrapf (err , "error deleting pod: %s" , p .GetName ())
593
+ return pkgerrors .Wrapf (err , "error deleting pod: %s" , p .GetName ())
549
594
}
550
595
}
551
596
return nil
@@ -623,7 +668,7 @@ func (c *GrpcRegistryReconciler) podFailed(pod *corev1.Pod) (bool, error) {
623
668
logrus .WithField ("UpdatePod" , pod .GetName ()).Infof ("catalog polling result: update pod %s failed to start" , pod .GetName ())
624
669
err := c .removePods ([]* corev1.Pod {pod }, pod .GetNamespace ())
625
670
if err != nil {
626
- return true , errors .Wrapf (err , "error deleting failed catalog polling pod: %s" , pod .GetName ())
671
+ return true , pkgerrors .Wrapf (err , "error deleting failed catalog polling pod: %s" , pod .GetName ())
627
672
}
628
673
return true , nil
629
674
}
0 commit comments