Skip to content

Commit 2799d00

Browse files
mfojtikbertinatto
authored andcommitted
UPSTREAM: <carry>: patch aggregator to allow delegating resources
UPSTREAM: <carry>: prevent apiservice registration by CRD controller when delegating UPSTREAM: <carry>: prevent CRD registration from fighting with APIServices UPSTREAM: <carry>: always delegate namespaced resources OpenShift-Rebase-Source: d4cd0ba
1 parent 81374d8 commit 2799d00

File tree

4 files changed

+92
-13
lines changed

4 files changed

+92
-13
lines changed

pkg/controlplane/controller/crdregistration/crdregistration_controller.go

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"k8s.io/client-go/tools/cache"
3434
"k8s.io/client-go/util/workqueue"
3535
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
36+
"k8s.io/kube-aggregator/pkg/apiserver"
3637
)
3738

3839
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
@@ -196,6 +197,10 @@ func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResour
196197
func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
197198
apiServiceName := groupVersion.Version + "." + groupVersion.Group
198199

200+
if apiserver.APIServiceAlreadyExists(groupVersion) {
201+
return nil
202+
}
203+
199204
// check all CRDs. There shouldn't that many, but if we have problems later we can index them
200205
crds, err := c.crdLister.List(labels.Everything())
201206
if err != nil {

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

+29-13
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ type APIAggregator struct {
160160
// is the versions for the group.
161161
handledGroupVersions map[string]sets.Set[string]
162162

163+
// handledAlwaysLocalDelegatePaths are the URL paths that already have routes registered
164+
handledAlwaysLocalDelegatePaths sets.String
165+
163166
// lister is used to add group handling for /apis/<group> aggregator lookups based on
164167
// controller state
165168
lister listers.APIServiceLister
@@ -245,19 +248,20 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
245248
}
246249

247250
s := &APIAggregator{
248-
GenericAPIServer: genericServer,
249-
delegateHandler: delegationTarget.UnprotectedHandler(),
250-
proxyTransportDial: proxyTransportDial,
251-
proxyHandlers: map[string]*proxyHandler{},
252-
handledGroupVersions: map[string]sets.Set[string]{},
253-
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
254-
APIRegistrationInformers: informerFactory,
255-
serviceResolver: c.ExtraConfig.ServiceResolver,
256-
openAPIConfig: c.GenericConfig.OpenAPIConfig,
257-
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
258-
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
259-
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
260-
tracerProvider: c.GenericConfig.TracerProvider,
251+
GenericAPIServer: genericServer,
252+
delegateHandler: delegationTarget.UnprotectedHandler(),
253+
proxyTransportDial: proxyTransportDial,
254+
proxyHandlers: map[string]*proxyHandler{},
255+
handledGroupVersions: map[string]sets.Set[string]{},
256+
handledAlwaysLocalDelegatePaths: sets.String{},
257+
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
258+
APIRegistrationInformers: informerFactory,
259+
serviceResolver: c.ExtraConfig.ServiceResolver,
260+
openAPIConfig: c.GenericConfig.OpenAPIConfig,
261+
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
262+
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
263+
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
264+
tracerProvider: c.GenericConfig.TracerProvider,
261265
}
262266

263267
// used later to filter the served resource by those that have expired.
@@ -618,6 +622,18 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
618622
return nil
619623
}
620624

625+
// For some resources we always want to delegate to local API server.
626+
// These resources have to exists as CRD to be served locally.
627+
for _, alwaysLocalDelegatePath := range alwaysLocalDelegatePathPrefixes.List() {
628+
if s.handledAlwaysLocalDelegatePaths.Has(alwaysLocalDelegatePath) {
629+
continue
630+
}
631+
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(alwaysLocalDelegatePath, proxyHandler.localDelegate)
632+
// Always use local delegate for this prefix
633+
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(alwaysLocalDelegatePath+"/", proxyHandler.localDelegate)
634+
s.handledAlwaysLocalDelegatePaths.Insert(alwaysLocalDelegatePath)
635+
}
636+
621637
// it's time to register the group aggregation endpoint
622638
groupPath := "/apis/" + apiService.Spec.Group
623639
groupDiscoveryHandler := &apiGroupHandler{

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go

+9
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync/atomic"
2323

2424
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/runtime/schema"
2526
"k8s.io/apimachinery/pkg/util/httpstream"
2627
"k8s.io/apimachinery/pkg/util/proxy"
2728
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
@@ -119,6 +120,14 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
119120
return
120121
}
121122

123+
// some groupResources should always be delegated
124+
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
125+
if alwaysLocalDelegateGroupResource[schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}] {
126+
r.localDelegate.ServeHTTP(w, req)
127+
return
128+
}
129+
}
130+
122131
if !handlingInfo.serviceAvailable {
123132
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
124133
return
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package apiserver
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"k8s.io/apimachinery/pkg/runtime/schema"
8+
"k8s.io/apimachinery/pkg/util/sets"
9+
)
10+
11+
// alwaysLocalDelegatePrefixes specify a list of API paths that we want to delegate to Kubernetes API server
12+
// instead of handling with OpenShift API server.
13+
var alwaysLocalDelegatePathPrefixes = sets.NewString()
14+
15+
// AddAlwaysLocalDelegateForPrefix will cause the given URL prefix always be served by local API server (kube apiserver).
16+
// This allows to move some resources from aggregated API server into CRD.
17+
func AddAlwaysLocalDelegateForPrefix(prefix string) {
18+
if alwaysLocalDelegatePathPrefixes.Has(prefix) {
19+
return
20+
}
21+
alwaysLocalDelegatePathPrefixes.Insert(prefix)
22+
}
23+
24+
var overlappingGroupVersion = map[schema.GroupVersion]bool{}
25+
26+
// AddOverlappingGroupVersion will stop the CRD registration controller from trying to manage an APIService.
27+
func AddOverlappingGroupVersion(groupVersion schema.GroupVersion) {
28+
overlappingGroupVersion[groupVersion] = true
29+
}
30+
31+
var alwaysLocalDelegateGroupResource = map[schema.GroupResource]bool{}
32+
33+
func AddAlwaysLocalDelegateGroupResource(groupResource schema.GroupResource) {
34+
alwaysLocalDelegateGroupResource[groupResource] = true
35+
}
36+
37+
func APIServiceAlreadyExists(groupVersion schema.GroupVersion) bool {
38+
if overlappingGroupVersion[groupVersion] {
39+
return true
40+
}
41+
42+
testPrefix := fmt.Sprintf("/apis/%s/%s/", groupVersion.Group, groupVersion.Version)
43+
for _, prefix := range alwaysLocalDelegatePathPrefixes.List() {
44+
if strings.HasPrefix(prefix, testPrefix) {
45+
return true
46+
}
47+
}
48+
return false
49+
}

0 commit comments

Comments
 (0)