@@ -3,16 +3,18 @@ package reconciler
3
3
4
4
import (
5
5
"context"
6
+ "errors"
6
7
"fmt"
7
8
8
- "github.com/pkg/errors"
9
+ pkgerrors "github.com/pkg/errors"
9
10
"github.com/sirupsen/logrus"
10
11
corev1 "k8s.io/api/core/v1"
11
12
rbacv1 "k8s.io/api/rbac/v1"
12
13
apierrors "k8s.io/apimachinery/pkg/api/errors"
13
14
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
15
"k8s.io/apimachinery/pkg/labels"
15
16
"k8s.io/apimachinery/pkg/util/intstr"
17
+ "k8s.io/utils/ptr"
16
18
17
19
"github.com/operator-framework/api/pkg/operators/v1alpha1"
18
20
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
@@ -284,19 +286,19 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(catalogSource *v1alph
284
286
285
287
//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
286
288
if err := c .ensureServiceAccount (source , overwrite ); err != nil {
287
- return errors .Wrapf (err , "error ensuring service account: %s" , source .serviceAccountName ())
289
+ return pkgerrors .Wrapf (err , "error ensuring service account: %s" , source .serviceAccountName ())
288
290
}
289
291
if err := c .ensureRole (source , overwrite ); err != nil {
290
- return errors .Wrapf (err , "error ensuring role: %s" , source .roleName ())
292
+ return pkgerrors .Wrapf (err , "error ensuring role: %s" , source .roleName ())
291
293
}
292
294
if err := c .ensureRoleBinding (source , overwrite ); err != nil {
293
- return errors .Wrapf (err , "error ensuring rolebinding: %s" , source .RoleBinding ().GetName ())
295
+ return pkgerrors .Wrapf (err , "error ensuring rolebinding: %s" , source .RoleBinding ().GetName ())
294
296
}
295
297
if err := c .ensurePod (source , overwritePod ); err != nil {
296
- return errors .Wrapf (err , "error ensuring pod: %s" , source .Pod (image ).GetName ())
298
+ return pkgerrors .Wrapf (err , "error ensuring pod: %s" , source .Pod (image ).GetName ())
297
299
}
298
300
if err := c .ensureService (source , overwrite ); err != nil {
299
- return errors .Wrapf (err , "error ensuring service: %s" , source .Service ().GetName ())
301
+ return pkgerrors .Wrapf (err , "error ensuring service: %s" , source .Service ().GetName ())
300
302
}
301
303
302
304
if overwritePod {
@@ -363,15 +365,15 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDec
363
365
}
364
366
for _ , p := range currentPods {
365
367
if err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (pod .GetNamespace ()).Delete (context .TODO (), p .GetName (), * metav1 .NewDeleteOptions (1 )); err != nil && ! apierrors .IsNotFound (err ) {
366
- return errors .Wrapf (err , "error deleting old pod: %s" , p .GetName ())
368
+ return pkgerrors .Wrapf (err , "error deleting old pod: %s" , p .GetName ())
367
369
}
368
370
}
369
371
}
370
372
_ , err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (pod .GetNamespace ()).Create (context .TODO (), pod , metav1.CreateOptions {})
371
373
if err == nil {
372
374
return nil
373
375
}
374
- return errors .Wrapf (err , "error creating new pod: %s" , pod .GetGenerateName ())
376
+ return pkgerrors .Wrapf (err , "error creating new pod: %s" , pod .GetGenerateName ())
375
377
}
376
378
377
379
func (c * ConfigMapRegistryReconciler ) ensureService (source configMapCatalogSourceDecorator , overwrite bool ) error {
@@ -390,16 +392,15 @@ func (c *ConfigMapRegistryReconciler) ensureService(source configMapCatalogSourc
390
392
}
391
393
392
394
// CheckRegistryServer returns true if the given CatalogSource is considered healthy; false otherwise.
393
- func (c * ConfigMapRegistryReconciler ) CheckRegistryServer (catalogSource * v1alpha1.CatalogSource ) (healthy bool , err error ) {
395
+ func (c * ConfigMapRegistryReconciler ) CheckRegistryServer (catalogSource * v1alpha1.CatalogSource ) (bool , error ) {
394
396
source := configMapCatalogSourceDecorator {catalogSource , c .createPodAsUser }
395
-
396
397
image := c .Image
397
398
if source .Spec .SourceType == "grpc" {
398
399
image = source .Spec .Image
399
400
}
400
401
if image == "" {
401
- err = fmt .Errorf ("no image for registry" )
402
- return
402
+ err : = fmt .Errorf ("no image for registry" )
403
+ return false , err
403
404
}
404
405
405
406
if source .Spec .SourceType == v1alpha1 .SourceTypeConfigmap || source .Spec .SourceType == v1alpha1 .SourceTypeInternal {
@@ -426,10 +427,59 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(catalogSource *v1alpha
426
427
c .currentRoleBinding (source ) == nil ||
427
428
c .currentService (source ) == nil ||
428
429
len (c .currentPods (source , c .Image )) < 1 {
429
- healthy = false
430
- return
430
+
431
+ return false , nil
431
432
}
432
433
433
- healthy = true
434
- return
434
+ podsAreLive , e := detectAndDeleteDeadPods (c .OpClient , c .currentPods (source , c .Image ), source .GetNamespace ())
435
+ if e != nil {
436
+ return false , fmt .Errorf ("error deleting dead pods: %v" , e )
437
+ }
438
+ return podsAreLive , nil
439
+ }
440
+
441
+ // detectAndDeleteDeadPods determines if there are registry client pods that are in the deleted state
442
+ // but have not been removed by GC (eg the node goes down before GC can remove them), and attempts to
443
+ // force delete the pods. If there are live registry pods remaining, it returns true, otherwise returns false.
444
+ func detectAndDeleteDeadPods (client operatorclient.ClientInterface , pods []* corev1.Pod , sourceNamespace string ) (bool , error ) {
445
+ var forceDeletionErrs []error
446
+ livePodFound := false
447
+ for _ , pod := range pods {
448
+ if ! isPodDead (pod ) {
449
+ livePodFound = true
450
+ continue
451
+ }
452
+ if err := client .KubernetesInterface ().CoreV1 ().Pods (sourceNamespace ).Delete (context .TODO (), pod .GetName (), metav1.DeleteOptions {
453
+ GracePeriodSeconds : ptr.To [int64 ](0 ),
454
+ }); err != nil && ! apierrors .IsNotFound (err ) {
455
+ forceDeletionErrs = append (forceDeletionErrs , err )
456
+ }
457
+ }
458
+ if len (forceDeletionErrs ) > 0 {
459
+ return false , errors .Join (forceDeletionErrs ... )
460
+ }
461
+ return livePodFound , nil
462
+ }
463
+
464
+ func isPodDead (pod * corev1.Pod ) bool {
465
+ for _ , check := range []func (* corev1.Pod ) bool {
466
+ isPodDeletedByTaintManager ,
467
+ } {
468
+ if check (pod ) {
469
+ return true
470
+ }
471
+ }
472
+ return false
473
+ }
474
+
475
+ func isPodDeletedByTaintManager (pod * corev1.Pod ) bool {
476
+ if pod .DeletionTimestamp == nil {
477
+ return false
478
+ }
479
+ for _ , condition := range pod .Status .Conditions {
480
+ if condition .Type == corev1 .DisruptionTarget && condition .Reason == "DeletionByTaintManager" && condition .Status == corev1 .ConditionTrue {
481
+ return true
482
+ }
483
+ }
484
+ return false
435
485
}
0 commit comments