diff --git a/cmd/postgres-operator/main.go b/cmd/postgres-operator/main.go index 6522abed19..5076e76943 100644 --- a/cmd/postgres-operator/main.go +++ b/cmd/postgres-operator/main.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/otel" "k8s.io/apimachinery/pkg/util/validation" - "k8s.io/client-go/discovery" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -157,6 +156,10 @@ func main() { // deprecation warnings when using an older version of a resource for backwards compatibility). rest.SetDefaultWarningHandler(rest.NoWarnings{}) + apis, err := runtime.NewAPIDiscoveryRunner(cfg) + assertNoError(err) + assertNoError(apis.Read()) + options, err := initManager() assertNoError(err) @@ -165,13 +168,17 @@ func main() { options.BaseContext = func() context.Context { ctx := context.Background() ctx = feature.NewContext(ctx, features) + ctx = runtime.NewAPIContext(ctx, apis) return ctx } mgr, err := runtime.NewManager(cfg, options) assertNoError(err) + assertNoError(mgr.Add(apis)) - openshift := isOpenshift(cfg) + openshift := apis.Has(runtime.API{ + Group: "security.openshift.io", Kind: "SecurityContextConstraints", + }) if openshift { log.Info("detected OpenShift environment") } @@ -275,33 +282,3 @@ func addControllersToManager(mgr runtime.Manager, openshift bool, log logging.Lo os.Exit(1) } } - -func isOpenshift(cfg *rest.Config) bool { - const sccGroupName, sccKind = "security.openshift.io", "SecurityContextConstraints" - - client, err := discovery.NewDiscoveryClientForConfig(cfg) - assertNoError(err) - - groups, err := client.ServerGroups() - if err != nil { - assertNoError(err) - } - for _, g := range groups.Groups { - if g.Name != sccGroupName { - continue - } - for _, v := range g.Versions { - resourceList, err := client.ServerResourcesForGroupVersion(v.GroupVersion) - if err != nil { - assertNoError(err) - } - for _, r := range resourceList.APIResources { - if r.Kind == sccKind { - return true - } - } - } - } - - return false -} diff --git a/internal/controller/runtime/api_discovery.go b/internal/controller/runtime/api_discovery.go new file mode 100644 index 0000000000..ffb6e4663a --- /dev/null +++ b/internal/controller/runtime/api_discovery.go @@ -0,0 +1,238 @@ +// Copyright 2023 - 2024 Crunchy Data Solutions, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "context" + "errors" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" + + "github.com/crunchydata/postgres-operator/internal/logging" +) + +// API is a combination of Group, Version, and Kind that can be used to check +// what is available in the Kubernetes API. There are four ways to populate it: +// 1. Group without Version nor Kind means any resource in that Group. +// 2. Group with Version but no Kind means any resource in that GV. +// 3. Group with Kind but no Version means that Kind in any Version of the Group. +// 4. Group with Version and Kind means that exact GVK. +type API = schema.GroupVersionKind + +type APIs interface { + Has(API) bool + HasAll(...API) bool + HasOne(...API) bool +} + +// APISet implements [APIs] using empty struct for minimal memory consumption. +type APISet map[API]struct{} + +func NewAPISet(api ...API) APISet { + s := make(APISet) + + for i := range api { + s[api[i]] = struct{}{} + s[API{Group: api[i].Group}] = struct{}{} + s[API{Group: api[i].Group, Version: api[i].Version}] = struct{}{} + s[API{Group: api[i].Group, Kind: api[i].Kind}] = struct{}{} + } + + return s +} + +// Has returns true when api is available in s. +func (s APISet) Has(api API) bool { return s.HasOne(api) } + +// HasAll returns true when every api is available in s. +func (s APISet) HasAll(api ...API) bool { + for i := range api { + if _, present := s[api[i]]; !present { + return false + } + } + return true +} + +// HasOne returns true when at least one api is available in s. +func (s APISet) HasOne(api ...API) bool { + for i := range api { + if _, present := s[api[i]]; present { + return true + } + } + return false +} + +type APIDiscoveryRunner struct { + Client interface { + ServerGroups() (*metav1.APIGroupList, error) + ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error) + } + + refresh time.Duration + + want []API + have struct { + sync.RWMutex + APISet + } +} + +// NewAPIDiscoveryRunner creates an [APIDiscoveryRunner] that periodically reads +// what APIs are available in the Kubernetes at config. +func NewAPIDiscoveryRunner(config *rest.Config) (*APIDiscoveryRunner, error) { + dc, err := discovery.NewDiscoveryClientForConfig(config) + + runner := &APIDiscoveryRunner{ + Client: dc, + refresh: 10 * time.Minute, + want: []API{ + {Group: "cert-manager.io", Kind: "Certificate"}, + {Group: "gateway.networking.k8s.io", Kind: "ReferenceGrant"}, + {Group: "security.openshift.io", Kind: "SecurityContextConstraints"}, + {Group: "snapshot.storage.k8s.io", Kind: "VolumeSnapshot"}, + {Group: "trust.cert-manager.io", Kind: "Bundle"}, + }, + } + + return runner, err +} + +// NeedLeaderElection returns false so that r runs on any [manager.Manager], +// regardless of which is elected leader in the Kubernetes namespace. +func (r *APIDiscoveryRunner) NeedLeaderElection() bool { return false } + +// Read fetches available APIs from Kubernetes. +func (r *APIDiscoveryRunner) Read() error { + + // Build an index of the APIs we want to know about. + wantAPIs := make(map[string]map[string]sets.Set[string]) + for _, want := range r.want { + if wantAPIs[want.Group] == nil { + wantAPIs[want.Group] = make(map[string]sets.Set[string]) + } + if wantAPIs[want.Group][want.Version] == nil { + wantAPIs[want.Group][want.Version] = sets.New[string]() + } + if want.Kind != "" { + wantAPIs[want.Group][want.Version].Insert(want.Kind) + } + } + + // Fetch Groups and Versions from Kubernetes. + groups, err := r.Client.ServerGroups() + if err != nil { + return err + } + + // Build an index of the Groups, GVs, GKs, and GVKs available in Kuberentes + // that we want to know about. + haveWantedAPIs := make(map[API]struct{}) + for _, apiG := range groups.Groups { + var haveG string = apiG.Name + haveWantedAPIs[API{Group: haveG}] = struct{}{} + + for _, apiGV := range apiG.Versions { + var haveV string = apiGV.Version + haveWantedAPIs[API{Group: haveG, Version: haveV}] = struct{}{} + + // Only fetch Resources when there are Kinds we want to know about. + if wantAPIs[haveG][""].Len() == 0 && wantAPIs[haveG][haveV].Len() == 0 { + continue + } + + resources, err := r.Client.ServerResourcesForGroupVersion(apiGV.GroupVersion) + if err != nil { + return err + } + + for _, apiR := range resources.APIResources { + var haveK string = apiR.Kind + haveWantedAPIs[API{Group: haveG, Kind: haveK}] = struct{}{} + haveWantedAPIs[API{Group: haveG, Kind: haveK, Version: haveV}] = struct{}{} + } + } + } + + r.have.Lock() + r.have.APISet = haveWantedAPIs + r.have.Unlock() + + return nil +} + +// Start periodically reads the Kuberentes API. It blocks until ctx is cancelled. +func (r *APIDiscoveryRunner) Start(ctx context.Context) error { + ticker := time.NewTicker(r.refresh) + defer ticker.Stop() + + log := logging.FromContext(ctx).WithValues("controller", "kubernetes") + + for { + select { + case <-ticker.C: + if err := r.Read(); err != nil { + log.Error(err, "Unable to detect Kubernetes APIs") + } + case <-ctx.Done(): + // TODO(controller-runtime): Fixed in v0.19.0 + // https://github.com/kubernetes-sigs/controller-runtime/issues/1927 + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + return ctx.Err() + } + } +} + +// Has returns true when api is available in Kuberentes. +func (r *APIDiscoveryRunner) Has(api API) bool { return r.HasOne(api) } + +// HasAll returns true when every api is available in Kubernetes. +func (r *APIDiscoveryRunner) HasAll(api ...API) bool { + r.have.RLock() + defer r.have.RUnlock() + return r.have.HasAll(api...) +} + +// HasOne returns true when at least one api is available in Kubernetes. +func (r *APIDiscoveryRunner) HasOne(api ...API) bool { + r.have.RLock() + defer r.have.RUnlock() + return r.have.HasOne(api...) +} + +type apiContextKey struct{} + +// Kubernetes returns the APIs previously stored by [NewAPIContext]. +// When nothing was stored, it returns an empty [APISet]. +func Kubernetes(ctx context.Context) APIs { + if apis, ok := ctx.Value(apiContextKey{}).(APIs); ok { + return apis + } + return APISet{} +} + +// NewAPIContext returns a copy of ctx containing apis. Retrieve it using [Kubernetes]. +func NewAPIContext(ctx context.Context, apis APIs) context.Context { + return context.WithValue(ctx, apiContextKey{}, apis) +} diff --git a/internal/controller/runtime/api_discovery_test.go b/internal/controller/runtime/api_discovery_test.go new file mode 100644 index 0000000000..00959f28bc --- /dev/null +++ b/internal/controller/runtime/api_discovery_test.go @@ -0,0 +1,85 @@ +// Copyright 2023 - 2024 Crunchy Data Solutions, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "context" + "testing" + + "gotest.tools/v3/assert" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func TestAPISet(t *testing.T) { + t.Parallel() + + var zero APISet + assert.Assert(t, !zero.Has(API{Group: "security.openshift.io"})) + assert.Assert(t, !zero.Has(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"})) + assert.Assert(t, !zero.HasAll(API{Group: "security.openshift.io"}, API{Group: "snapshot.storage.k8s.io"})) + assert.Assert(t, !zero.HasOne(API{Group: "security.openshift.io"}, API{Group: "snapshot.storage.k8s.io"})) + + empty := NewAPISet() + assert.Assert(t, !empty.Has(API{Group: "security.openshift.io"})) + assert.Assert(t, !empty.Has(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"})) + + one := NewAPISet( + API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"}, + ) + assert.Assert(t, one.Has(API{Group: "security.openshift.io"})) + assert.Assert(t, one.Has(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"})) + assert.Assert(t, !one.HasAll(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"})) + assert.Assert(t, !one.HasOne(API{Group: "snapshot.storage.k8s.io"})) + assert.Assert(t, one.HasOne(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"})) + + two := NewAPISet( + API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"}, + API{Group: "snapshot.storage.k8s.io", Kind: "VolumeSnapshot"}, + ) + assert.Assert(t, two.Has(API{Group: "security.openshift.io"})) + assert.Assert(t, two.Has(API{Group: "snapshot.storage.k8s.io"})) + assert.Assert(t, two.HasAll(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"})) + assert.Assert(t, two.HasOne(API{Group: "snapshot.storage.k8s.io"})) + assert.Assert(t, two.HasOne(API{Group: "snapshot.storage.k8s.io"}, API{Group: "security.openshift.io"})) +} + +func TestAPIContext(t *testing.T) { + t.Parallel() + + // The background context always return false. + ctx := context.Background() + + assert.Assert(t, !Kubernetes(ctx).Has(API{Group: "security.openshift.io"})) + assert.Assert(t, !Kubernetes(ctx).Has(API{Group: "snapshot.storage.k8s.io"})) + + // An initialized context returns what is stored. + set := NewAPISet(API{Group: "security.openshift.io", Kind: "SecurityContextConstraints"}) + ctx = NewAPIContext(ctx, set) + + assert.Assert(t, Kubernetes(ctx).Has(API{Group: "security.openshift.io"})) + assert.Assert(t, !Kubernetes(ctx).Has(API{Group: "snapshot.storage.k8s.io"})) + + // The stored value is mutable within the context. + set[API{Group: "snapshot.storage.k8s.io"}] = struct{}{} + assert.Assert(t, Kubernetes(ctx).Has(API{Group: "snapshot.storage.k8s.io"})) +} + +func TestAPIDiscoveryRunnerInterfaces(t *testing.T) { + var _ APIs = new(APIDiscoveryRunner) + var _ manager.Runnable = new(APIDiscoveryRunner) + + var runnable manager.LeaderElectionRunnable = new(APIDiscoveryRunner) + assert.Assert(t, false == runnable.NeedLeaderElection()) +} diff --git a/internal/registration/runner.go b/internal/registration/runner.go index e34412c07d..c8be37f2f9 100644 --- a/internal/registration/runner.go +++ b/internal/registration/runner.go @@ -185,6 +185,7 @@ func (r *Runner) Start(ctx context.Context) error { r.changed() } case <-ctx.Done(): + // TODO(controller-runtime): Fixed in v0.19.0 // https://github.com/kubernetes-sigs/controller-runtime/issues/1927 if errors.Is(ctx.Err(), context.Canceled) { return nil