Skip to content

Commit 1730102

Browse files
committed
Use a Runner to watch for changes to the available Kubernetes APIs
1 parent 4faa994 commit 1730102

File tree

4 files changed

+167
-32
lines changed

4 files changed

+167
-32
lines changed

cmd/postgres-operator/main.go

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727

2828
"go.opentelemetry.io/otel"
2929
"k8s.io/apimachinery/pkg/util/validation"
30-
"k8s.io/client-go/discovery"
3130
"k8s.io/client-go/rest"
3231
"sigs.k8s.io/controller-runtime/pkg/healthz"
3332

@@ -157,6 +156,10 @@ func main() {
157156
// deprecation warnings when using an older version of a resource for backwards compatibility).
158157
rest.SetDefaultWarningHandler(rest.NoWarnings{})
159158

159+
apis, err := runtime.NewAPIDiscoveryRunner(cfg)
160+
assertNoError(err)
161+
assertNoError(apis.Read())
162+
160163
options, err := initManager()
161164
assertNoError(err)
162165

@@ -165,13 +168,17 @@ func main() {
165168
options.BaseContext = func() context.Context {
166169
ctx := context.Background()
167170
ctx = feature.NewContext(ctx, features)
171+
ctx = runtime.NewAPIContext(ctx, apis)
168172
return ctx
169173
}
170174

171175
mgr, err := runtime.NewManager(cfg, options)
172176
assertNoError(err)
177+
assertNoError(mgr.Add(apis))
173178

174-
openshift := isOpenshift(cfg)
179+
openshift := apis.Has(runtime.API{
180+
Group: "security.openshift.io", Kind: "SecurityContextConstraints",
181+
})
175182
if openshift {
176183
log.Info("detected OpenShift environment")
177184
}
@@ -275,33 +282,3 @@ func addControllersToManager(mgr runtime.Manager, openshift bool, log logging.Lo
275282
os.Exit(1)
276283
}
277284
}
278-
279-
func isOpenshift(cfg *rest.Config) bool {
280-
const sccGroupName, sccKind = "security.openshift.io", "SecurityContextConstraints"
281-
282-
client, err := discovery.NewDiscoveryClientForConfig(cfg)
283-
assertNoError(err)
284-
285-
groups, err := client.ServerGroups()
286-
if err != nil {
287-
assertNoError(err)
288-
}
289-
for _, g := range groups.Groups {
290-
if g.Name != sccGroupName {
291-
continue
292-
}
293-
for _, v := range g.Versions {
294-
resourceList, err := client.ServerResourcesForGroupVersion(v.GroupVersion)
295-
if err != nil {
296-
assertNoError(err)
297-
}
298-
for _, r := range resourceList.APIResources {
299-
if r.Kind == sccKind {
300-
return true
301-
}
302-
}
303-
}
304-
}
305-
306-
return false
307-
}

internal/controller/runtime/api_discovery.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,17 @@ package runtime
1616

1717
import (
1818
"context"
19+
"errors"
20+
"sync"
21+
"time"
1922

23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2024
"k8s.io/apimachinery/pkg/runtime/schema"
25+
"k8s.io/apimachinery/pkg/util/sets"
26+
"k8s.io/client-go/discovery"
27+
"k8s.io/client-go/rest"
28+
29+
"github.com/crunchydata/postgres-operator/internal/logging"
2130
)
2231

2332
// API is a combination of Group, Version, and Kind that can be used to check
@@ -73,6 +82,145 @@ func (s APISet) HasOne(api ...API) bool {
7382
return false
7483
}
7584

85+
type APIDiscoveryRunner struct {
86+
Client interface {
87+
ServerGroups() (*metav1.APIGroupList, error)
88+
ServerResourcesForGroupVersion(string) (*metav1.APIResourceList, error)
89+
}
90+
91+
refresh time.Duration
92+
93+
want []API
94+
have struct {
95+
sync.RWMutex
96+
APISet
97+
}
98+
}
99+
100+
// NewAPIDiscoveryRunner creates an [APIDiscoveryRunner] that periodically reads
101+
// what APIs are available in the Kubernetes at config.
102+
func NewAPIDiscoveryRunner(config *rest.Config) (*APIDiscoveryRunner, error) {
103+
dc, err := discovery.NewDiscoveryClientForConfig(config)
104+
105+
runner := &APIDiscoveryRunner{
106+
Client: dc,
107+
refresh: 10 * time.Minute,
108+
want: []API{
109+
{Group: "cert-manager.io", Kind: "Certificate"},
110+
{Group: "gateway.networking.k8s.io", Kind: "ReferenceGrant"},
111+
{Group: "security.openshift.io", Kind: "SecurityContextConstraints"},
112+
{Group: "snapshot.storage.k8s.io", Kind: "VolumeSnapshot"},
113+
{Group: "trust.cert-manager.io", Kind: "Bundle"},
114+
},
115+
}
116+
117+
return runner, err
118+
}
119+
120+
// NeedLeaderElection returns false so that r runs on any [manager.Manager],
121+
// regardless of which is elected leader in the Kubernetes namespace.
122+
func (r *APIDiscoveryRunner) NeedLeaderElection() bool { return false }
123+
124+
// Read fetches available APIs from Kubernetes.
125+
func (r *APIDiscoveryRunner) Read() error {
126+
127+
// Build an index of the APIs we want to know about.
128+
wantAPIs := make(map[string]map[string]sets.Set[string])
129+
for _, want := range r.want {
130+
if wantAPIs[want.Group] == nil {
131+
wantAPIs[want.Group] = make(map[string]sets.Set[string])
132+
}
133+
if wantAPIs[want.Group][want.Version] == nil {
134+
wantAPIs[want.Group][want.Version] = sets.New[string]()
135+
}
136+
if want.Kind != "" {
137+
wantAPIs[want.Group][want.Version].Insert(want.Kind)
138+
}
139+
}
140+
141+
// Fetch Groups and Versions from Kubernetes.
142+
groups, err := r.Client.ServerGroups()
143+
if err != nil {
144+
return err
145+
}
146+
147+
// Build an index of the Groups, GVs, GKs, and GVKs available in Kuberentes
148+
// that we want to know about.
149+
haveWantedAPIs := make(map[API]struct{})
150+
for _, apiG := range groups.Groups {
151+
var haveG string = apiG.Name
152+
haveWantedAPIs[API{Group: haveG}] = struct{}{}
153+
154+
for _, apiGV := range apiG.Versions {
155+
var haveV string = apiGV.Version
156+
haveWantedAPIs[API{Group: haveG, Version: haveV}] = struct{}{}
157+
158+
// Only fetch Resources when there are Kinds we want to know about.
159+
if wantAPIs[haveG][""].Len() == 0 && wantAPIs[haveG][haveV].Len() == 0 {
160+
continue
161+
}
162+
163+
resources, err := r.Client.ServerResourcesForGroupVersion(apiGV.GroupVersion)
164+
if err != nil {
165+
return err
166+
}
167+
168+
for _, apiR := range resources.APIResources {
169+
var haveK string = apiR.Kind
170+
haveWantedAPIs[API{Group: haveG, Kind: haveK}] = struct{}{}
171+
haveWantedAPIs[API{Group: haveG, Kind: haveK, Version: haveV}] = struct{}{}
172+
}
173+
}
174+
}
175+
176+
r.have.Lock()
177+
r.have.APISet = haveWantedAPIs
178+
r.have.Unlock()
179+
180+
return nil
181+
}
182+
183+
// Start periodically reads the Kuberentes API. It blocks until ctx is cancelled.
184+
func (r *APIDiscoveryRunner) Start(ctx context.Context) error {
185+
ticker := time.NewTicker(r.refresh)
186+
defer ticker.Stop()
187+
188+
log := logging.FromContext(ctx).WithValues("controller", "kubernetes")
189+
190+
for {
191+
select {
192+
case <-ticker.C:
193+
if err := r.Read(); err != nil {
194+
log.Error(err, "Unable to detect Kubernetes APIs")
195+
}
196+
case <-ctx.Done():
197+
// TODO(controller-runtime): Fixed in v0.19.0
198+
// https://github.com/kubernetes-sigs/controller-runtime/issues/1927
199+
if errors.Is(ctx.Err(), context.Canceled) {
200+
return nil
201+
}
202+
return ctx.Err()
203+
}
204+
}
205+
}
206+
207+
// Has returns true when api is available in Kuberentes.
208+
func (r *APIDiscoveryRunner) Has(api API) bool { return r.HasOne(api) }
209+
210+
// HasAll returns true when every api is available in Kubernetes.
211+
func (r *APIDiscoveryRunner) HasAll(api ...API) bool {
212+
r.have.RLock()
213+
defer r.have.RUnlock()
214+
return r.have.HasAll(api...)
215+
}
216+
217+
// HasOne returns true when at least one api is available in Kubernetes.
218+
func (r *APIDiscoveryRunner) HasOne(api ...API) bool {
219+
r.have.RLock()
220+
defer r.have.RUnlock()
221+
return r.have.HasOne(api...)
222+
}
223+
76224
type apiContextKey struct{}
77225

78226
// Kubernetes returns the APIs previously stored by [NewAPIContext].

internal/controller/runtime/api_discovery_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020

2121
"gotest.tools/v3/assert"
22+
"sigs.k8s.io/controller-runtime/pkg/manager"
2223
)
2324

2425
func TestAPISet(t *testing.T) {
@@ -74,3 +75,11 @@ func TestAPIContext(t *testing.T) {
7475
set[API{Group: "snapshot.storage.k8s.io"}] = struct{}{}
7576
assert.Assert(t, Kubernetes(ctx).Has(API{Group: "snapshot.storage.k8s.io"}))
7677
}
78+
79+
func TestAPIDiscoveryRunnerInterfaces(t *testing.T) {
80+
var _ APIs = new(APIDiscoveryRunner)
81+
var _ manager.Runnable = new(APIDiscoveryRunner)
82+
83+
var runnable manager.LeaderElectionRunnable = new(APIDiscoveryRunner)
84+
assert.Assert(t, false == runnable.NeedLeaderElection())
85+
}

internal/registration/runner.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ func (r *Runner) Start(ctx context.Context) error {
185185
r.changed()
186186
}
187187
case <-ctx.Done():
188+
// TODO(controller-runtime): Fixed in v0.19.0
188189
// https://github.com/kubernetes-sigs/controller-runtime/issues/1927
189190
if errors.Is(ctx.Err(), context.Canceled) {
190191
return nil

0 commit comments

Comments
 (0)