Skip to content

Commit d4cd0ba

Browse files
mfojtiksoltysh
authored andcommitted
UPSTREAM: <carry>: patch aggregator to allow delegating resources
Origin-commit: 14ba1f8ece9a7bb00ececb2a35b5f8f5fbeacc83 UPSTREAM: <carry>: prevent apiservice registration by CRD controller when delegating Origin-commit: 3d216eab7adcbd8596606d72d31b6af621bfd350 UPSTREAM: <carry>: prevent CRD registration from fighting with APIServices Origin-commit: c1c87eeade4730a2271cb98b4c6ea16af07e3e68 UPSTREAM: <carry>: always delegate namespaced resources Origin-commit: 7f0815b5a88d57046a92fbdbc493bab2ad28a79c openshift-rebase(v1.24):source=f9a6b73ca78 openshift-rebase(v1.24):source=f9a6b73ca78 openshift-rebase(v1.24):source=f9a6b73ca78
1 parent 26005f1 commit d4cd0ba

File tree

4 files changed

+91
-12
lines changed

4 files changed

+91
-12
lines changed

pkg/controlplane/controller/crdregistration/crdregistration_controller.go

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"k8s.io/client-go/tools/cache"
3434
"k8s.io/client-go/util/workqueue"
3535
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
36+
"k8s.io/kube-aggregator/pkg/apiserver"
3637
)
3738

3839
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
@@ -193,6 +194,10 @@ func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResour
193194
func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
194195
apiServiceName := groupVersion.Version + "." + groupVersion.Group
195196

197+
if apiserver.APIServiceAlreadyExists(groupVersion) {
198+
return nil
199+
}
200+
196201
// check all CRDs. There shouldn't that many, but if we have problems later we can index them
197202
crds, err := c.crdLister.List(labels.Everything())
198203
if err != nil {

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

+28-12
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ type APIAggregator struct {
137137
// handledGroups are the groups that already have routes
138138
handledGroups sets.String
139139

140+
// handledAlwaysLocalDelegatePaths are the URL paths that already have routes registered
141+
handledAlwaysLocalDelegatePaths sets.String
142+
140143
// lister is used to add group handling for /apis/<group> aggregator lookups based on
141144
// controller state
142145
lister listers.APIServiceLister
@@ -204,18 +207,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
204207
}
205208

206209
s := &APIAggregator{
207-
GenericAPIServer: genericServer,
208-
delegateHandler: delegationTarget.UnprotectedHandler(),
209-
proxyTransport: c.ExtraConfig.ProxyTransport,
210-
proxyHandlers: map[string]*proxyHandler{},
211-
handledGroups: sets.String{},
212-
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
213-
APIRegistrationInformers: informerFactory,
214-
serviceResolver: c.ExtraConfig.ServiceResolver,
215-
openAPIConfig: c.GenericConfig.OpenAPIConfig,
216-
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
217-
egressSelector: c.GenericConfig.EgressSelector,
218-
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
210+
GenericAPIServer: genericServer,
211+
delegateHandler: delegationTarget.UnprotectedHandler(),
212+
proxyTransport: c.ExtraConfig.ProxyTransport,
213+
proxyHandlers: map[string]*proxyHandler{},
214+
handledGroups: sets.String{},
215+
handledAlwaysLocalDelegatePaths: sets.String{},
216+
lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
217+
APIRegistrationInformers: informerFactory,
218+
serviceResolver: c.ExtraConfig.ServiceResolver,
219+
openAPIConfig: c.GenericConfig.OpenAPIConfig,
220+
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
221+
egressSelector: c.GenericConfig.EgressSelector,
222+
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
219223
}
220224

221225
// used later to filter the served resource by those that have expired.
@@ -502,6 +506,18 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
502506
return nil
503507
}
504508

509+
// For some resources we always want to delegate to local API server.
510+
// These resources have to exists as CRD to be served locally.
511+
for _, alwaysLocalDelegatePath := range alwaysLocalDelegatePathPrefixes.List() {
512+
if s.handledAlwaysLocalDelegatePaths.Has(alwaysLocalDelegatePath) {
513+
continue
514+
}
515+
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(alwaysLocalDelegatePath, proxyHandler.localDelegate)
516+
// Always use local delegate for this prefix
517+
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(alwaysLocalDelegatePath+"/", proxyHandler.localDelegate)
518+
s.handledAlwaysLocalDelegatePaths.Insert(alwaysLocalDelegatePath)
519+
}
520+
505521
// it's time to register the group aggregation endpoint
506522
groupPath := "/apis/" + apiService.Spec.Group
507523
groupDiscoveryHandler := &apiGroupHandler{

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go

+9
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/runtime/schema"
2829
"k8s.io/apimachinery/pkg/util/httpstream"
2930
utilnet "k8s.io/apimachinery/pkg/util/net"
3031
"k8s.io/apimachinery/pkg/util/proxy"
@@ -122,6 +123,14 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
122123
return
123124
}
124125

126+
// some groupResources should always be delegated
127+
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
128+
if alwaysLocalDelegateGroupResource[schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}] {
129+
r.localDelegate.ServeHTTP(w, req)
130+
return
131+
}
132+
}
133+
125134
if !handlingInfo.serviceAvailable {
126135
proxyError(w, req, "service unavailable", http.StatusServiceUnavailable)
127136
return
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package apiserver
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"k8s.io/apimachinery/pkg/runtime/schema"
8+
"k8s.io/apimachinery/pkg/util/sets"
9+
)
10+
11+
// alwaysLocalDelegatePrefixes specify a list of API paths that we want to delegate to Kubernetes API server
12+
// instead of handling with OpenShift API server.
13+
var alwaysLocalDelegatePathPrefixes = sets.NewString()
14+
15+
// AddAlwaysLocalDelegateForPrefix will cause the given URL prefix always be served by local API server (kube apiserver).
16+
// This allows to move some resources from aggregated API server into CRD.
17+
func AddAlwaysLocalDelegateForPrefix(prefix string) {
18+
if alwaysLocalDelegatePathPrefixes.Has(prefix) {
19+
return
20+
}
21+
alwaysLocalDelegatePathPrefixes.Insert(prefix)
22+
}
23+
24+
var overlappingGroupVersion = map[schema.GroupVersion]bool{}
25+
26+
// AddOverlappingGroupVersion will stop the CRD registration controller from trying to manage an APIService.
27+
func AddOverlappingGroupVersion(groupVersion schema.GroupVersion) {
28+
overlappingGroupVersion[groupVersion] = true
29+
}
30+
31+
var alwaysLocalDelegateGroupResource = map[schema.GroupResource]bool{}
32+
33+
func AddAlwaysLocalDelegateGroupResource(groupResource schema.GroupResource) {
34+
alwaysLocalDelegateGroupResource[groupResource] = true
35+
}
36+
37+
func APIServiceAlreadyExists(groupVersion schema.GroupVersion) bool {
38+
if overlappingGroupVersion[groupVersion] {
39+
return true
40+
}
41+
42+
testPrefix := fmt.Sprintf("/apis/%s/%s/", groupVersion.Group, groupVersion.Version)
43+
for _, prefix := range alwaysLocalDelegatePathPrefixes.List() {
44+
if strings.HasPrefix(prefix, testPrefix) {
45+
return true
46+
}
47+
}
48+
return false
49+
}

0 commit comments

Comments
 (0)