Skip to content

Commit 1ef204a

Browse files
committed
add controller to update apiservices based on tpr
1 parent b34f03e commit 1ef204a

File tree

3 files changed

+384
-1
lines changed

3 files changed

+384
-1
lines changed

pkg/master/thirdparty/BUILD

+29-1
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,32 @@ licenses(["notice"])
55
load(
66
"@io_bazel_rules_go//go:def.bzl",
77
"go_library",
8+
"go_test",
89
)
910

1011
go_library(
1112
name = "go_default_library",
12-
srcs = ["thirdparty.go"],
13+
srcs = [
14+
"thirdparty.go",
15+
"tprregistration_controller.go",
16+
],
1317
tags = ["automanaged"],
1418
deps = [
1519
"//pkg/api:go_default_library",
1620
"//pkg/apis/extensions:go_default_library",
21+
"//pkg/client/informers/informers_generated/internalversion/extensions/internalversion:go_default_library",
22+
"//pkg/client/listers/extensions/internalversion:go_default_library",
1723
"//pkg/registry/extensions/rest:go_default_library",
1824
"//pkg/registry/extensions/thirdpartyresourcedata:go_default_library",
1925
"//pkg/registry/extensions/thirdpartyresourcedata/storage:go_default_library",
2026
"//vendor:github.com/golang/glog",
2127
"//vendor:k8s.io/apimachinery/pkg/api/meta",
2228
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
29+
"//vendor:k8s.io/apimachinery/pkg/labels",
2330
"//vendor:k8s.io/apimachinery/pkg/runtime",
2431
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
32+
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
33+
"//vendor:k8s.io/apimachinery/pkg/util/wait",
2534
"//vendor:k8s.io/apiserver/pkg/endpoints",
2635
"//vendor:k8s.io/apiserver/pkg/endpoints/handlers",
2736
"//vendor:k8s.io/apiserver/pkg/endpoints/request",
@@ -30,6 +39,9 @@ go_library(
3039
"//vendor:k8s.io/apiserver/pkg/server",
3140
"//vendor:k8s.io/apiserver/pkg/server/storage",
3241
"//vendor:k8s.io/apiserver/pkg/storage/storagebackend",
42+
"//vendor:k8s.io/client-go/tools/cache",
43+
"//vendor:k8s.io/client-go/util/workqueue",
44+
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
3345
],
3446
)
3547

@@ -45,3 +57,19 @@ filegroup(
4557
srcs = [":package-srcs"],
4658
tags = ["automanaged"],
4759
)
60+
61+
go_test(
62+
name = "go_default_test",
63+
srcs = ["tprregistration_controller_test.go"],
64+
library = ":go_default_library",
65+
tags = ["automanaged"],
66+
deps = [
67+
"//pkg/apis/extensions:go_default_library",
68+
"//pkg/client/listers/extensions/internalversion:go_default_library",
69+
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
70+
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
71+
"//vendor:k8s.io/client-go/tools/cache",
72+
"//vendor:k8s.io/client-go/util/workqueue",
73+
"//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration",
74+
],
75+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
Copyright 2017 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package thirdparty
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"github.com/golang/glog"
24+
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/labels"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
28+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29+
"k8s.io/apimachinery/pkg/util/wait"
30+
"k8s.io/client-go/tools/cache"
31+
"k8s.io/client-go/util/workqueue"
32+
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
33+
"k8s.io/kubernetes/pkg/apis/extensions"
34+
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/extensions/internalversion"
35+
listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion"
36+
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
37+
)
38+
39+
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
40+
// adding and removing APIServices
41+
type AutoAPIServiceRegistration interface {
42+
// AddAPIServiceToSync adds an API service to auto-register.
43+
AddAPIServiceToSync(in *apiregistration.APIService)
44+
// RemoveAPIServiceToSync removes an API service to auto-register.
45+
RemoveAPIServiceToSync(name string)
46+
}
47+
48+
type tprRegistrationController struct {
49+
tprLister listers.ThirdPartyResourceLister
50+
tprSynced cache.InformerSynced
51+
52+
apiServiceRegistration AutoAPIServiceRegistration
53+
54+
syncHandler func(groupVersion schema.GroupVersion) error
55+
56+
// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
57+
// this is actually keyed by a groupVersion
58+
queue workqueue.RateLimitingInterface
59+
}
60+
61+
// NewAutoRegistrationController returns a controller which will register TPR GroupVersions with the auto APIService registration
62+
// controller so they automatically stay in sync.
63+
func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInformer, apiServiceRegistration AutoAPIServiceRegistration) *tprRegistrationController {
64+
c := &tprRegistrationController{
65+
tprLister: tprInformer.Lister(),
66+
tprSynced: tprInformer.Informer().HasSynced,
67+
apiServiceRegistration: apiServiceRegistration,
68+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"),
69+
}
70+
c.syncHandler = c.handleTPR
71+
72+
tprInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
73+
AddFunc: func(obj interface{}) {
74+
cast := obj.(*extensions.ThirdPartyResource)
75+
c.enqueueTPR(cast)
76+
},
77+
UpdateFunc: func(_, obj interface{}) {
78+
cast := obj.(*extensions.ThirdPartyResource)
79+
c.enqueueTPR(cast)
80+
},
81+
DeleteFunc: func(obj interface{}) {
82+
cast, ok := obj.(*extensions.ThirdPartyResource)
83+
if !ok {
84+
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
85+
if !ok {
86+
glog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
87+
return
88+
}
89+
cast, ok = tombstone.Obj.(*extensions.ThirdPartyResource)
90+
if !ok {
91+
glog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
92+
return
93+
}
94+
}
95+
c.enqueueTPR(cast)
96+
},
97+
})
98+
99+
return c
100+
}
101+
102+
func (c *tprRegistrationController) Run(threadiness int, stopCh chan struct{}) {
103+
// don't let panics crash the process
104+
defer utilruntime.HandleCrash()
105+
// make sure the work queue is shutdown which will trigger workers to end
106+
defer c.queue.ShutDown()
107+
108+
glog.Infof("Starting tpr-autoregister controller")
109+
defer glog.Infof("Shutting down tpr-autoregister controller")
110+
111+
// wait for your secondary caches to fill before starting your work
112+
if !cache.WaitForCacheSync(stopCh, c.tprSynced) {
113+
return
114+
}
115+
116+
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers
117+
for i := 0; i < threadiness; i++ {
118+
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker
119+
// after one second
120+
go wait.Until(c.runWorker, time.Second, stopCh)
121+
}
122+
123+
// wait until we're told to stop
124+
<-stopCh
125+
}
126+
127+
func (c *tprRegistrationController) runWorker() {
128+
// hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work
129+
// available, so we don't worry about secondary waits
130+
for c.processNextWorkItem() {
131+
}
132+
}
133+
134+
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
135+
func (c *tprRegistrationController) processNextWorkItem() bool {
136+
// pull the next work item from queue. It should be a key we use to lookup something in a cache
137+
key, quit := c.queue.Get()
138+
if quit {
139+
return false
140+
}
141+
// you always have to indicate to the queue that you've completed a piece of work
142+
defer c.queue.Done(key)
143+
144+
// do your work on the key. This method will contains your "do stuff" logic
145+
err := c.syncHandler(key.(schema.GroupVersion))
146+
if err == nil {
147+
// if you had no error, tell the queue to stop tracking history for your key. This will
148+
// reset things like failure counts for per-item rate limiting
149+
c.queue.Forget(key)
150+
return true
151+
}
152+
153+
// there was a failure so be sure to report it. This method allows for pluggable error handling
154+
// which can be used for things like cluster-monitoring
155+
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
156+
// since we failed, we should requeue the item to work on later. This method will add a backoff
157+
// to avoid hotlooping on particular items (they're probably still not going to work right away)
158+
// and overall controller protection (everything I've done is broken, this controller needs to
159+
// calm down or it can starve other useful work) cases.
160+
c.queue.AddRateLimited(key)
161+
162+
return true
163+
}
164+
165+
func (c *tprRegistrationController) enqueueTPR(tpr *extensions.ThirdPartyResource) {
166+
_, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr)
167+
if err != nil {
168+
utilruntime.HandleError(err)
169+
return
170+
}
171+
for _, version := range tpr.Versions {
172+
c.queue.Add(schema.GroupVersion{Group: group, Version: version.Name})
173+
}
174+
}
175+
176+
func (c *tprRegistrationController) handleTPR(groupVersion schema.GroupVersion) error {
177+
// check all TPRs. There shouldn't that many, but if we have problems later we can index them
178+
tprs, err := c.tprLister.List(labels.Everything())
179+
if err != nil {
180+
return err
181+
}
182+
183+
found := false
184+
for _, tpr := range tprs {
185+
_, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr)
186+
if err != nil {
187+
return err
188+
}
189+
for _, version := range tpr.Versions {
190+
if version.Name == groupVersion.Version && group == groupVersion.Group {
191+
found = true
192+
break
193+
}
194+
}
195+
}
196+
197+
apiServiceName := groupVersion.Version + "." + groupVersion.Group
198+
199+
if !found {
200+
c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName)
201+
return nil
202+
}
203+
204+
c.apiServiceRegistration.AddAPIServiceToSync(&apiregistration.APIService{
205+
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
206+
Spec: apiregistration.APIServiceSpec{
207+
Group: groupVersion.Group,
208+
Version: groupVersion.Version,
209+
Priority: 500, // TPRs should have relatively low priority
210+
},
211+
})
212+
213+
return nil
214+
}

0 commit comments

Comments
 (0)