Skip to content

Commit cabe200

Browse files
authored
Address Invalid Address in GRPC Catalogs (#2499)
Problem: Within the catalogSource resource, the RegistryServiceStatus stores service information that is used to generate an address that OLM relies on in order to establish a connection with the associated pod. If the RegistryStatusService is not nil and is missing the namespace, name, and port information for its service, OLM is unable to recover until the catalogService's associated pod has an invalid image or spec. Solution: When reconciling a CatalogSource, OLM will now ensure that the RegistryServiceStatus of the catalogSource is valid and will update the catalogSource's status to reflect the change. Additionally, this address is stored within the status of the catalogSource within the status.GRPCConnectionState.Address field. If the address changes, OLM will update this field to reflect the new address as well. Signed-off-by: Alexander Greene <[email protected]>
1 parent 52f368d commit cabe200

File tree

4 files changed

+128
-12
lines changed

4 files changed

+128
-12
lines changed

pkg/controller/operators/catalog/operator.go

+1
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,7 @@ func (o *Operator) syncConnection(logger *logrus.Entry, in *v1alpha1.CatalogSour
803803
// Set connection status and return.
804804
out.Status.GRPCConnectionState.LastConnectTime = now
805805
out.Status.GRPCConnectionState.LastObservedState = source.ConnectionState.String()
806+
out.Status.GRPCConnectionState.Address = source.Address
806807
}
807808

808809
return

pkg/controller/operators/catalog/operator_test.go

+80-9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned/fake"
5050
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
5151
olmerrors "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/errors"
52+
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
5253
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
5354
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
5455
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver"
@@ -758,6 +759,12 @@ func TestExecutePlanDynamicResources(t *testing.T) {
758759
}
759760
}
760761

762+
func withStatus(catalogSource v1alpha1.CatalogSource, status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
763+
copy := catalogSource.DeepCopy()
764+
copy.Status = status
765+
return copy
766+
}
767+
761768
func TestSyncCatalogSources(t *testing.T) {
762769
clockFake := utilclock.NewFakeClock(time.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC))
763770
now := metav1.NewTime(clockFake.Now())
@@ -786,14 +793,15 @@ func TestSyncCatalogSources(t *testing.T) {
786793
},
787794
}
788795
tests := []struct {
789-
testName string
790-
namespace string
791-
catalogSource *v1alpha1.CatalogSource
792-
k8sObjs []runtime.Object
793-
configMap *corev1.ConfigMap
794-
expectedStatus *v1alpha1.CatalogSourceStatus
795-
expectedObjs []runtime.Object
796-
expectedError error
796+
testName string
797+
namespace string
798+
catalogSource *v1alpha1.CatalogSource
799+
k8sObjs []runtime.Object
800+
configMap *corev1.ConfigMap
801+
expectedStatus *v1alpha1.CatalogSourceStatus
802+
expectedObjs []runtime.Object
803+
expectedError error
804+
existingSources []sourceAddress
797805
}{
798806
{
799807
testName: "CatalogSourceWithInvalidSourceType",
@@ -1013,6 +1021,47 @@ func TestSyncCatalogSources(t *testing.T) {
10131021
},
10141022
expectedError: nil,
10151023
},
1024+
{
1025+
testName: "GRPCConnectionStateAddressIsUpdated",
1026+
namespace: "cool-namespace",
1027+
catalogSource: withStatus(*grpcCatalog, v1alpha1.CatalogSourceStatus{
1028+
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
1029+
Protocol: "grpc",
1030+
ServiceName: "cool-catalog",
1031+
ServiceNamespace: "cool-namespace",
1032+
Port: "50051",
1033+
CreatedAt: now,
1034+
},
1035+
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
1036+
Address: "..svc:", // Needs to be updated to cool-catalog.cool-namespace.svc:50051
1037+
},
1038+
}),
1039+
k8sObjs: []runtime.Object{
1040+
pod(*grpcCatalog),
1041+
service(grpcCatalog.GetName(), grpcCatalog.GetNamespace()),
1042+
},
1043+
existingSources: []sourceAddress{
1044+
{
1045+
sourceKey: registry.CatalogKey{Name: "cool-catalog", Namespace: "cool-namespace"},
1046+
address: "cool-catalog.cool-namespace.svc:50051",
1047+
},
1048+
},
1049+
expectedStatus: &v1alpha1.CatalogSourceStatus{
1050+
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
1051+
Protocol: "grpc",
1052+
ServiceName: "cool-catalog",
1053+
ServiceNamespace: "cool-namespace",
1054+
Port: "50051",
1055+
CreatedAt: now,
1056+
},
1057+
GRPCConnectionState: &v1alpha1.GRPCConnectionState{
1058+
Address: "cool-catalog.cool-namespace.svc:50051",
1059+
LastObservedState: "",
1060+
LastConnectTime: now,
1061+
},
1062+
},
1063+
expectedError: nil,
1064+
},
10161065
}
10171066
for _, tt := range tests {
10181067
t.Run(tt.testName, func(t *testing.T) {
@@ -1023,7 +1072,7 @@ func TestSyncCatalogSources(t *testing.T) {
10231072
ctx, cancel := context.WithCancel(context.TODO())
10241073
defer cancel()
10251074

1026-
op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...))
1075+
op, err := NewFakeOperator(ctx, tt.namespace, []string{tt.namespace}, withClock(clockFake), withClientObjs(clientObjs...), withK8sObjs(tt.k8sObjs...), withSources(tt.existingSources...))
10271076
require.NoError(t, err)
10281077

10291078
// Run sync
@@ -1040,6 +1089,13 @@ func TestSyncCatalogSources(t *testing.T) {
10401089
require.NotEmpty(t, updated)
10411090

10421091
if tt.expectedStatus != nil {
1092+
if tt.expectedStatus.GRPCConnectionState != nil {
1093+
updated.Status.GRPCConnectionState.LastConnectTime = now
1094+
// Ignore LastObservedState difference if an expected LastObservedState is no provided
1095+
if tt.expectedStatus.GRPCConnectionState.LastObservedState == "" {
1096+
updated.Status.GRPCConnectionState.LastObservedState = ""
1097+
}
1098+
}
10431099
require.NotEmpty(t, updated.Status)
10441100
require.Equal(t, *tt.expectedStatus, updated.Status)
10451101

@@ -1384,6 +1440,7 @@ type fakeOperatorConfig struct {
13841440
resolver resolver.StepResolver
13851441
recorder record.EventRecorder
13861442
reconciler reconciler.RegistryReconcilerFactory
1443+
sources []sourceAddress
13871444
}
13881445

13891446
// fakeOperatorOption applies an option to the given fake operator configuration.
@@ -1395,6 +1452,12 @@ func withResolver(res resolver.StepResolver) fakeOperatorOption {
13951452
}
13961453
}
13971454

1455+
func withSources(sources ...sourceAddress) fakeOperatorOption {
1456+
return func(config *fakeOperatorConfig) {
1457+
config.sources = sources
1458+
}
1459+
}
1460+
13981461
func withReconciler(rec reconciler.RegistryReconcilerFactory) fakeOperatorOption {
13991462
return func(config *fakeOperatorConfig) {
14001463
config.reconciler = rec
@@ -1431,6 +1494,11 @@ func withFakeClientOptions(options ...clientfake.Option) fakeOperatorOption {
14311494
}
14321495
}
14331496

1497+
type sourceAddress struct {
1498+
address string
1499+
sourceKey registry.CatalogKey
1500+
}
1501+
14341502
// NewFakeOperator creates a new operator using fake clients.
14351503
func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, fakeOptions ...fakeOperatorOption) (*Operator, error) {
14361504
// Apply options to default config
@@ -1548,6 +1616,9 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
15481616

15491617
op.RunInformers(ctx)
15501618
op.sources.Start(ctx)
1619+
for _, source := range config.sources {
1620+
op.sources.Add(source.sourceKey, source.address)
1621+
}
15511622

15521623
if ok := cache.WaitForCacheSync(ctx.Done(), op.HasSynced); !ok {
15531624
return nil, fmt.Errorf("failed to wait for caches to sync")

pkg/controller/registry/reconciler/grpc.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
191191
source := grpcCatalogSourceDecorator{catalogSource}
192192

193193
// if service status is nil, we force create every object to ensure they're created the first time
194-
overwrite := source.Status.RegistryServiceStatus == nil
194+
overwrite := source.Status.RegistryServiceStatus == nil || !isRegistryServiceStatusValid(&source)
195195

196196
//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
197197
sa, err := c.ensureSA(source)
@@ -216,17 +216,33 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
216216

217217
if overwritePod {
218218
now := c.now()
219+
service := source.Service()
219220
catalogSource.Status.RegistryServiceStatus = &v1alpha1.RegistryServiceStatus{
220221
CreatedAt: now,
221222
Protocol: "grpc",
222-
ServiceName: source.Service().GetName(),
223+
ServiceName: service.GetName(),
223224
ServiceNamespace: source.GetNamespace(),
224-
Port: fmt.Sprintf("%d", source.Service().Spec.Ports[0].Port),
225+
Port: getPort(service),
225226
}
226227
}
227228
return nil
228229
}
229230

231+
func getPort(service *corev1.Service) string {
232+
return fmt.Sprintf("%d", service.Spec.Ports[0].Port)
233+
}
234+
235+
func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) bool {
236+
service := source.Service()
237+
if source.Status.RegistryServiceStatus.ServiceName != service.GetName() ||
238+
source.Status.RegistryServiceStatus.ServiceNamespace != service.GetNamespace() ||
239+
source.Status.RegistryServiceStatus.Port != getPort(service) ||
240+
source.Status.RegistryServiceStatus.Protocol != "grpc" {
241+
return false
242+
}
243+
return true
244+
}
245+
230246
func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, saName string, overwrite bool) error {
231247
// currentLivePods refers to the currently live instances of the catalog source
232248
currentLivePods := c.currentPods(source)

pkg/controller/registry/reconciler/grpc_test.go

+28
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ func grpcCatalogSourceWithSecret(secretNames []string) *v1alpha1.CatalogSource {
4949
},
5050
}
5151
}
52+
func grpcCatalogSourceWithStatus(status v1alpha1.CatalogSourceStatus) *v1alpha1.CatalogSource {
53+
catsrc := validGrpcCatalogSource("image", "")
54+
catsrc.Status = status
55+
return catsrc
56+
}
5257

5358
func grpcCatalogSourceWithAnnotations(annotations map[string]string) *v1alpha1.CatalogSource {
5459
catsrc := validGrpcCatalogSource("image", "")
@@ -284,6 +289,29 @@ func TestGrpcRegistryReconciler(t *testing.T) {
284289
},
285290
},
286291
},
292+
{
293+
testName: "Grpc/ExistingRegistry/UpdateInvalidRegistryServiceStatus",
294+
in: in{
295+
cluster: cluster{
296+
k8sObjs: objectsForCatalogSource(validGrpcCatalogSource("image", "")),
297+
},
298+
catsrc: grpcCatalogSourceWithStatus(v1alpha1.CatalogSourceStatus{
299+
RegistryServiceStatus: &v1alpha1.RegistryServiceStatus{
300+
CreatedAt: now(),
301+
Protocol: "grpc",
302+
},
303+
}),
304+
},
305+
out: out{
306+
status: &v1alpha1.RegistryServiceStatus{
307+
CreatedAt: now(),
308+
Protocol: "grpc",
309+
ServiceName: "img-catalog",
310+
ServiceNamespace: testNamespace,
311+
Port: "50051",
312+
},
313+
},
314+
},
287315
}
288316
for _, tt := range tests {
289317
t.Run(tt.testName, func(t *testing.T) {

0 commit comments

Comments
 (0)