@@ -14,9 +14,11 @@ import (
14
14
rbacv1 "k8s.io/api/rbac/v1"
15
15
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
16
16
k8serrors "k8s.io/apimachinery/pkg/api/errors"
17
+ "k8s.io/apimachinery/pkg/api/meta"
17
18
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
19
"k8s.io/apimachinery/pkg/labels"
19
20
"k8s.io/apimachinery/pkg/runtime"
21
+ "k8s.io/apimachinery/pkg/selection"
20
22
utilclock "k8s.io/apimachinery/pkg/util/clock"
21
23
utilerrors "k8s.io/apimachinery/pkg/util/errors"
22
24
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -51,6 +53,10 @@ import (
51
53
"github.com/operator-framework/operator-lifecycle-manager/pkg/metrics"
52
54
)
53
55
56
+ const (
57
+ copiedCSVsDisabledReason = "CopiedCSVsDisabled"
58
+ )
59
+
54
60
var (
55
61
ErrRequirementsNotMet = errors .New ("requirements were not met" )
56
62
ErrCRDOwnerConflict = errors .New ("conflicting CRD owner in namespace" )
@@ -68,6 +74,7 @@ type Operator struct {
68
74
copiedCSVLister operatorsv1alpha1listers.ClusterServiceVersionLister
69
75
ogQueueSet * queueinformer.ResourceQueueSet
70
76
csvQueueSet * queueinformer.ResourceQueueSet
77
+ olmConfigQueue workqueue.RateLimitingInterface
71
78
csvCopyQueueSet * queueinformer.ResourceQueueSet
72
79
copiedCSVGCQueueSet * queueinformer.ResourceQueueSet
73
80
objGCQueueSet * queueinformer.ResourceQueueSet
@@ -124,6 +131,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
124
131
client : config .externalClient ,
125
132
ogQueueSet : queueinformer .NewEmptyResourceQueueSet (),
126
133
csvQueueSet : queueinformer .NewEmptyResourceQueueSet (),
134
+ olmConfigQueue : workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "olmConfig" ),
127
135
csvCopyQueueSet : queueinformer .NewEmptyResourceQueueSet (),
128
136
copiedCSVGCQueueSet : queueinformer .NewEmptyResourceQueueSet (),
129
137
objGCQueueSet : queueinformer .NewEmptyResourceQueueSet (),
@@ -251,6 +259,26 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
251
259
return nil , err
252
260
}
253
261
262
+ // Register QueueInformer for olmConfig
263
+ olmConfigInformer := externalversions .NewSharedInformerFactoryWithOptions (
264
+ op .client ,
265
+ config .resyncPeriod (),
266
+ ).Operators ().V1 ().OLMConfigs ().Informer ()
267
+ olmConfigQueueInformer , err := queueinformer .NewQueueInformer (
268
+ ctx ,
269
+ queueinformer .WithInformer (olmConfigInformer ),
270
+ queueinformer .WithLogger (op .logger ),
271
+ queueinformer .WithQueue (op .olmConfigQueue ),
272
+ queueinformer .WithIndexer (olmConfigInformer .GetIndexer ()),
273
+ queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncOLMConfig ).ToSyncer ()),
274
+ )
275
+ if err != nil {
276
+ return nil , err
277
+ }
278
+ if err := op .RegisterQueueInformer (olmConfigQueueInformer ); err != nil {
279
+ return nil , err
280
+ }
281
+
254
282
// Wire OperatorGroup reconciliation
255
283
extInformerFactory := externalversions .NewSharedInformerFactoryWithOptions (op .client , config .resyncPeriod (), externalversions .WithNamespace (namespace ))
256
284
operatorGroupInformer := extInformerFactory .Operators ().V1 ().OperatorGroups ()
@@ -1211,6 +1239,99 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
1211
1239
return
1212
1240
}
1213
1241
1242
+ func isNamespaceClusterScopedMap (operatorGroups ... * v1.OperatorGroup ) map [string ]bool {
1243
+ result := map [string ]bool {}
1244
+ for _ , operatorGroup := range operatorGroups {
1245
+ result [operatorGroup .GetNamespace ()] = NewNamespaceSet (operatorGroup .Status .Namespaces ).IsAllNamespaces ()
1246
+ }
1247
+ return result
1248
+ }
1249
+
1250
+ func (a * Operator ) syncOLMConfig (obj interface {}) (syncError error ) {
1251
+ a .logger .Info ("Processing olmConfig" )
1252
+ olmConfig , ok := obj .(* v1.OLMConfig )
1253
+ if ! ok {
1254
+ return fmt .Errorf ("casting OLMConfig failed" )
1255
+ }
1256
+
1257
+ // Generate a map to track namespaces that are cluster scoped
1258
+ operatorGroups , err := a .lister .OperatorsV1 ().OperatorGroupLister ().List (labels .Everything ())
1259
+ if err != nil {
1260
+ return err
1261
+ }
1262
+
1263
+ isNamespaceClusterScoped := isNamespaceClusterScopedMap (operatorGroups ... )
1264
+
1265
+ // Get non-copied CSVs
1266
+ requirement , err := labels .NewRequirement (v1alpha1 .CopiedLabelKey , selection .DoesNotExist , []string {})
1267
+ if err != nil {
1268
+ return err
1269
+ }
1270
+
1271
+ csvs , err := a .lister .OperatorsV1alpha1 ().ClusterServiceVersionLister ().List (labels .NewSelector ().Add (* requirement ))
1272
+ if err != nil {
1273
+ return err
1274
+ }
1275
+
1276
+ csvIsRequeued := false
1277
+ for _ , csv := range csvs {
1278
+ // For each cluster scope installation
1279
+ if ! isNamespaceClusterScoped [csv .GetNamespace ()] {
1280
+ continue
1281
+ }
1282
+
1283
+ // Get a count of the number of CSVS
1284
+ requirement , err := labels .NewRequirement (v1alpha1 .CopiedLabelKey , selection .Equals , []string {csv .GetNamespace ()})
1285
+ if err != nil {
1286
+ return err
1287
+ }
1288
+
1289
+ copiedCSVs , err := a .copiedCSVLister .List (labels .NewSelector ().Add (* requirement ))
1290
+ if err != nil {
1291
+ return err
1292
+ }
1293
+
1294
+ // If the correct number of copied CSVs were found, continue
1295
+ if len (copiedCSVs ) == 0 == ! olmConfig .CopiedCSVsAreEnabled () {
1296
+ continue
1297
+ }
1298
+
1299
+ // There were an incorrect number of copied CSVs, requeue the original CSV.
1300
+ if err := a .csvQueueSet .Requeue (csv .GetNamespace (), csv .GetName ()); err != nil {
1301
+ return err
1302
+ }
1303
+ csvIsRequeued = true
1304
+ }
1305
+ condition := metav1.Condition {
1306
+ Reason : "CopiedCSVCountCorrect" ,
1307
+ Status : metav1 .ConditionTrue ,
1308
+ Message : "Correct Number of copiedCSVs created" ,
1309
+ ObservedGeneration : olmConfig .GetGeneration (),
1310
+ Type : "ready" ,
1311
+ LastTransitionTime : metav1 .Now (),
1312
+ }
1313
+
1314
+ // If a CSV was requeued, reflect that the cluster is not yet in the expect state.
1315
+ if csvIsRequeued {
1316
+ condition .Message = "At least one CSV had an unexpected number of copied CSVs"
1317
+ condition .Status = metav1 .ConditionFalse
1318
+ defer func () {
1319
+ a .olmConfigQueue .AddAfter (olmConfig , time .Second * 5 )
1320
+ }()
1321
+ }
1322
+
1323
+ // Update the olmConfig status if it has changed.
1324
+ if ! meta .IsStatusConditionPresentAndEqual (olmConfig .Status .Conditions , condition .Type , condition .Status ) {
1325
+ a .logger .Infof ("Updating Condition: %v" , condition )
1326
+ meta .SetStatusCondition (& olmConfig .Status .Conditions , condition )
1327
+ if _ , err := a .client .OperatorsV1 ().OLMConfigs ().UpdateStatus (context .TODO (), olmConfig , metav1.UpdateOptions {}); err != nil {
1328
+ return err
1329
+ }
1330
+ }
1331
+
1332
+ return nil
1333
+ }
1334
+
1214
1335
func (a * Operator ) syncCopyCSV (obj interface {}) (syncError error ) {
1215
1336
clusterServiceVersion , ok := obj .(* v1alpha1.ClusterServiceVersion )
1216
1337
if ! ok {
@@ -1239,15 +1360,145 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
1239
1360
"targetNamespaces" : strings .Join (operatorGroup .Status .Namespaces , "," ),
1240
1361
}).Debug ("copying csv to targets" )
1241
1362
1363
+ copiedCSVsAreEnabled , err := a .copiedCSVsAreEnabled ()
1364
+ if err != nil {
1365
+ return err
1366
+ }
1367
+
1242
1368
// Check if we need to do any copying / annotation for the operatorgroup
1243
- if err := a .ensureCSVsInNamespaces (clusterServiceVersion , operatorGroup , NewNamespaceSet (operatorGroup .Status .Namespaces )); err != nil {
1244
- logger .WithError (err ).Info ("couldn't copy CSV to target namespaces" )
1245
- syncError = err
1369
+ namespaceSet := NewNamespaceSet (operatorGroup .Status .Namespaces )
1370
+ if copiedCSVsAreEnabled || ! namespaceSet .IsAllNamespaces () {
1371
+ if err := a .ensureCSVsInNamespaces (clusterServiceVersion , operatorGroup , namespaceSet ); err != nil {
1372
+ logger .WithError (err ).Info ("couldn't copy CSV to target namespaces" )
1373
+ syncError = err
1374
+ }
1375
+
1376
+ // If the CSV was installed in AllNamespace mode, remove any "CSV Copying Disabled" events
1377
+ // in which the related object's name, namespace, and uid match the given CSV's.
1378
+ if namespaceSet .IsAllNamespaces () {
1379
+ if err := a .deleteCSVCopyingDisabledEvent (clusterServiceVersion ); err != nil {
1380
+ return err
1381
+ }
1382
+ }
1383
+ return
1384
+ }
1385
+
1386
+ requirement , err := labels .NewRequirement (v1alpha1 .CopiedLabelKey , selection .Equals , []string {clusterServiceVersion .Namespace })
1387
+ if err != nil {
1388
+ return err
1389
+ }
1390
+
1391
+ copiedCSVs , err := a .copiedCSVLister .List (labels .NewSelector ().Add (* requirement ))
1392
+ if err != nil {
1393
+ return err
1394
+ }
1395
+
1396
+ for _ , copiedCSV := range copiedCSVs {
1397
+ err := a .client .OperatorsV1alpha1 ().ClusterServiceVersions (copiedCSV .Namespace ).Delete (context .TODO (), copiedCSV .Name , metav1.DeleteOptions {})
1398
+ if err != nil && ! k8serrors .IsNotFound (err ) {
1399
+ return err
1400
+ }
1401
+ }
1402
+
1403
+ if err := a .createCSVCopyingDisabledEvent (clusterServiceVersion ); err != nil {
1404
+ return err
1246
1405
}
1247
1406
1248
1407
return
1249
1408
}
1250
1409
1410
+ // copiedCSVsAreEnabled determines if csv copying is enabled for OLM.
1411
+ //
1412
+ // This method will first attempt to get the "cluster" olmConfig resource,
1413
+ // if any error other than "IsNotFound" is encountered, false and the error
1414
+ // will be returned.
1415
+ //
1416
+ // If the "cluster" olmConfig resource is found, the value of
1417
+ // olmConfig.spec.features.disableCopiedCSVs will be returned along with a
1418
+ // nil error.
1419
+ //
1420
+ // If the "cluster" olmConfig resource is not found, true will be returned
1421
+ // without an error.
1422
+ func (a * Operator ) copiedCSVsAreEnabled () (bool , error ) {
1423
+ olmConfig , err := a .client .OperatorsV1 ().OLMConfigs ().Get (context .TODO (), "cluster" , metav1.GetOptions {})
1424
+ if err != nil {
1425
+ // Default to true if olmConfig singleton cannot be found
1426
+ if k8serrors .IsNotFound (err ) {
1427
+ return true , nil
1428
+ }
1429
+ // If there was an error that wasn't an IsNotFound, return the error
1430
+ return false , err
1431
+ }
1432
+
1433
+ // If there was no error, return value based on olmConfig singleton
1434
+ return olmConfig .CopiedCSVsAreEnabled (), nil
1435
+ }
1436
+
1437
+ func (a * Operator ) getCopiedCSVDisabledEventsForCSV (csv * v1alpha1.ClusterServiceVersion ) ([]corev1.Event , error ) {
1438
+ result := []corev1.Event {}
1439
+ if csv == nil {
1440
+ return result , nil
1441
+ }
1442
+
1443
+ events , err := a .opClient .KubernetesInterface ().CoreV1 ().Events (csv .GetNamespace ()).List (context .TODO (), metav1.ListOptions {})
1444
+ if err != nil {
1445
+ return nil , err
1446
+ }
1447
+
1448
+ for _ , event := range events .Items {
1449
+ if event .InvolvedObject .Namespace == csv .GetNamespace () &&
1450
+ event .InvolvedObject .Name == csv .GetName () &&
1451
+ event .InvolvedObject .UID == csv .GetUID () &&
1452
+ event .Reason == copiedCSVsDisabledReason {
1453
+ result = append (result , event )
1454
+ }
1455
+ }
1456
+
1457
+ return result , nil
1458
+ }
1459
+
1460
+ func (a * Operator ) deleteCSVCopyingDisabledEvent (csv * v1alpha1.ClusterServiceVersion ) error {
1461
+ events , err := a .getCopiedCSVDisabledEventsForCSV (csv )
1462
+ if err != nil {
1463
+ return err
1464
+ }
1465
+
1466
+ // Remove existing events.
1467
+ return a .deleteEvents (events )
1468
+ }
1469
+
1470
+ func (a * Operator ) deleteEvents (events []corev1.Event ) error {
1471
+ for _ , event := range events {
1472
+ err := a .opClient .KubernetesInterface ().EventsV1 ().Events (event .GetNamespace ()).Delete (context .TODO (), event .GetName (), metav1.DeleteOptions {})
1473
+ if err != nil && ! k8serrors .IsNotFound (err ) {
1474
+ return err
1475
+ }
1476
+ }
1477
+ return nil
1478
+ }
1479
+
1480
+ func (a * Operator ) createCSVCopyingDisabledEvent (csv * v1alpha1.ClusterServiceVersion ) error {
1481
+ events , err := a .getCopiedCSVDisabledEventsForCSV (csv )
1482
+ if err != nil {
1483
+ return err
1484
+ }
1485
+
1486
+ if len (events ) == 1 {
1487
+ return nil
1488
+ }
1489
+
1490
+ // Remove existing events.
1491
+ if len (events ) > 1 {
1492
+ if err := a .deleteEvents (events ); err != nil {
1493
+ return err
1494
+ }
1495
+ }
1496
+
1497
+ a .recorder .Eventf (csv , corev1 .EventTypeWarning , copiedCSVsDisabledReason , "CSV copying disabled for %s/%s" , csv .GetNamespace (), csv .GetName ())
1498
+
1499
+ return nil
1500
+ }
1501
+
1251
1502
func (a * Operator ) syncGcCsv (obj interface {}) (syncError error ) {
1252
1503
clusterServiceVersion , ok := obj .(* v1alpha1.ClusterServiceVersion )
1253
1504
if ! ok {
0 commit comments